You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/04/12 00:21:30 UTC

[4/7] git commit: Checkpoint for shard allocation and layout of classes. Need to revamp time uuid utils as well as counter impl

Checkpoint for shard allocation and layout of classes. Need to revamp time uuid utils as well as counter impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c178fc65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c178fc65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c178fc65

Branch: refs/heads/asyncqueue
Commit: c178fc65efbd286436bb559a82feccd7797576ac
Parents: ed2e63b
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 4 14:17:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 4 14:17:11 2014 -0700

----------------------------------------------------------------------
 .../persistence/graph/impl/Constants.java       |  43 ++++++
 .../graph/impl/cache/NodeShardAllocation.java   |  40 ++---
 .../impl/cache/NodeShardAllocationImpl.java     | 145 +++++++++++++++++++
 .../graph/impl/cache/NodeShardCache.java        |   3 +-
 .../graph/impl/cache/NodeShardCacheImpl.java    |  11 +-
 .../EdgeSeriesCounterSerialization.java         |  36 +++--
 .../serialization/EdgeSeriesSerialization.java  |   7 +-
 7 files changed, 247 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/Constants.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/Constants.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/Constants.java
new file mode 100644
index 0000000..45ce323
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/Constants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.UUID;
+
+
+/**
+ * Simple stat class for re-used constants
+ */
+public class Constants {
+
+    /**
+     * The min time uuid type
+     */
+    public static final UUID MIN_UUID =  new UUID( 0, 1 );
+
+    /**
+     * The max time uuid type
+     */
+    public static final UUID MAX_UUID = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
index 8bc4f6c..ad4b330 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocation.java
@@ -27,19 +27,23 @@ import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 
-
+/**
+ * Interface used to create and retrieve shards
+ */
 public interface NodeShardAllocation {
 
-    public static final UUID MIN_UUID =  new UUID( 0, 1 );
+
 
     /**
      * Get all shards for the given info.  If none exist, a default shard should be allocated
      * @param scope
      * @param nodeId
+     * @param maxShardId The max value to start seeking from.  Values <= this will be returned
+     * @param count The count of elements to return
      * @param edgeTypes
      * @return A list of all shards <= the current shard.  This will always return MIN_UUID if no shards are allocated
      */
-    public List<UUID> getShards(final OrganizationScope scope, final Id nodeId, final String... edgeTypes);
+    public List<UUID> getShards(final OrganizationScope scope, final Id nodeId, UUID maxShardId, int count,  final String... edgeTypes);
 
 
     /**
@@ -52,30 +56,16 @@ public interface NodeShardAllocation {
      */
     public boolean auditMaxShard(final OrganizationScope scope, final Id nodeId, final String... edgeType);
 
-    /**
-        * The minimum uuid we allocate
-        */
-//       private static final UUID MIN_UUID =  new UUID( 0, 1 );
-//
 
     /**
-     *
-              * There are no shards allocated, allocate the minimum shard
-              */
-    /**
-     *
-     *
-             if(shards == null || shards.size() == 0){
-
-                 try {
-                     edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, MIN_UUID, edgeType ).execute();
-                 }
-                 catch ( ConnectionException e ) {
-                     throw new RuntimeException("Unable to write edge meta data", e);
-                 }
+     * Increment the shard Id the specified amount
+     * @param scope The scope
+     * @param nodeId The node id
+     * @param shardId The shard id
+     * @param count The count
+     * @param edgeType The edge type
+     */
+    public void increment(final OrganizationScope scope, final Id nodeId, final UUID shardId, int count, final String... edgeType);
 
-                 return MIN_UUID;
-              }
 
-     (**/
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
new file mode 100644
index 0000000..d03ccaa
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardAllocationImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl.cache;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSeriesSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.impl.Constants.MAX_UUID;
+
+
+/**
+ * Implementation of the node shard monitor and allocation
+ */
+public class NodeShardAllocationImpl implements NodeShardAllocation {
+
+
+    private final EdgeSeriesSerialization edgeSeriesSerialization;
+    private final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization;
+    private final GraphFig graphFig;
+
+
+    @Inject
+    public NodeShardAllocationImpl( final EdgeSeriesSerialization edgeSeriesSerialization,
+                                    final EdgeSeriesCounterSerialization edgeSeriesCounterSerialization,
+                                    final GraphFig graphFig ) {
+        this.edgeSeriesSerialization = edgeSeriesSerialization;
+        this.edgeSeriesCounterSerialization = edgeSeriesCounterSerialization;
+        this.graphFig = graphFig;
+    }
+
+
+    @Override
+    public List<UUID> getShards( final OrganizationScope scope, final Id nodeId, final UUID maxShardId, final int count,
+                                 final String... edgeTypes ) {
+        return edgeSeriesSerialization.getEdgeMetaData( scope, nodeId, maxShardId, count, edgeTypes );
+    }
+
+
+    @Override
+    public boolean auditMaxShard( final OrganizationScope scope, final Id nodeId, final String... edgeType ) {
+
+        final UUID now = UUIDGenerator.newTimeUUID();
+
+        List<UUID> maxShards = getShards( scope, nodeId, MAX_UUID, 1, edgeType );
+
+        //if the first shard has already been allocated, do nothing.
+
+        //now is already > than the max, don't do anything
+        if ( maxShards.size() > 0 && UUIDComparator.staticCompare( now, maxShards.get( 0 ) ) < 0 ) {
+            return false;
+        }
+
+        //allocate a new shard
+        //TODO T.N. modify the time uuid utils to allow future generation, this is incorrect, but a place holder
+        final UUID futureUUID = UUIDGenerator.newTimeUUID();
+
+        try {
+            this.edgeSeriesSerialization.writeEdgeMeta( scope, nodeId, futureUUID, edgeType ).execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to write the new edge metadata" );
+        }
+
+        UUID max = null;
+
+
+        MutationBatch rollup = null;
+        boolean completed = false;
+
+        //TODO, change this into an iterator
+        while ( !completed ) {
+
+            List<UUID> shards = getShards( scope, nodeId, MAX_UUID, 100, edgeType );
+
+            for ( UUID shardId : shards ) {
+                if ( UUIDComparator.staticCompare( shardId, max ) >= 0 ) {
+                    completed = true;
+                    break;
+                }
+
+
+                final MutationBatch batch = edgeSeriesSerialization.removeEdgeMeta( scope, nodeId, shardId, edgeType );
+
+                if ( rollup == null ) {
+                    rollup = batch;
+                }
+                else {
+                    rollup.mergeShallow( batch );
+                }
+            }
+
+
+            //while our max value is > than the value we just created, delete it
+        }
+
+        if(rollup != null){
+            try {
+                rollup.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to cleanup allocated shards" );
+            }
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public void increment( final OrganizationScope scope, final Id nodeId, final UUID shardId, final int count,
+                           final String... edgeType ) {
+        //delegate
+        edgeSeriesCounterSerialization.incrementMetadataCount( scope, nodeId, shardId, count, edgeType );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
index e73834d..251ee30 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCache.java
@@ -26,7 +26,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
 
 
 /**
- *  Cache implementation for returning versions based on the slice.
+ *  Cache implementation for returning versions based on the slice.  This cache may be latent.  As a result
+ *  the allocation of new shards should be 2*cache timeout in the future.
  *
  */
 public interface NodeShardCache {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
index 1c373af..943bf5f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/cache/NodeShardCacheImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.usergrid.persistence.graph.impl.cache;
 
+import static org.apache.usergrid.persistence.graph.impl.Constants.*;
 
 import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
@@ -51,6 +52,10 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
 
+    private static final int MAX_SHARD_COUNT = 10000;
+
+
+
     private final NodeShardAllocation nodeShardAllocation;
     private final GraphFig graphFig;
 
@@ -128,12 +133,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
                                       @Override
                                       public CacheEntry load( final CacheKey key ) throws Exception {
+                                          //doing this with a static size could result in lost "old" shards, this needs to be an
+                                          //iterating cache that can refresh if we have a full size
                                           final List<UUID> edges = nodeShardAllocation.getShards( key.scope,
-                                                  key.id, key.types );
+                                                  key.id, MAX_UUID, MAX_SHARD_COUNT, key.types );
 
 
                                           /**
-                                           * Perform an async audit
+                                           * Perform an async audit in case we need to allocate a new shard
                                            */
                                           nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
index f7855fc..42a0434 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesCounterSerialization.java
@@ -29,6 +29,25 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.netflix.astyanax.MutationBatch;
 
 
+/**
+ * The interface to define counter operations.  Note that the implementation may not be immediately consistent.
+ *
+ * TODO: Ivestigate this further with HyperLogLog.  Since our cardinality needs to be "good enough", we may be able
+ * to offer much better performance than the Cassandra counters by using hyperloglog on each node, and persisting it's map
+ * in memory with period flush into a standard CF.  On query, we can read a unioned column.
+ * On flush, we can flush, then read+union and set the timestamp on the column so that only 1 union will be the max.
+ *
+ * http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
+ *
+ * See also
+ *
+ * http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
+ *
+ * See also
+ *
+ * https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java
+ *
+ */
 public interface EdgeSeriesCounterSerialization {
 
     /**
@@ -40,18 +59,17 @@ public interface EdgeSeriesCounterSerialization {
      */
     public MutationBatch incrementMetadataCount( OrganizationScope scope, Id nodeId, UUID slice, int count,  String... types );
 
-//    /**
-//     * Get an iterator of all meta data and types
-//     * @param scope The organization scope
-//     * @param nodeId The id of the node
-//     * @param types The types to use
-//     * @return
-//     */
-//    public List<UUID> getEdgeMetadataCount( OrganizationScope scope, Id nodeId, String... types );
+    /**
+     * Get an iterator of all meta data and types
+     * @param scope The organization scope
+     * @param nodeId The id of the node
+     * @param types The types to use
+     * @return The current count
+     */
+    public long getEdgeMetadataCount( OrganizationScope scope, Id nodeId, String... types );
 
     /**
      * Remove the slice from the edge meta data from the types.
-
      * @param scope
      * @param nodeId
      * @param slice

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c178fc65/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
index 7388607..afa0041 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeSeriesSerialization.java
@@ -30,6 +30,9 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.netflix.astyanax.MutationBatch;
 
 
+/**
+ * The interface for creating and retrieving time series row keys
+ */
 public interface EdgeSeriesSerialization {
 
     /**
@@ -45,10 +48,12 @@ public interface EdgeSeriesSerialization {
      * Get an iterator of all meta data and types
      * @param scope The organization scope
      * @param nodeId The id of the node
+     * @param start The uuid to start seeking from.  Values <= this value will be returned.
+     * @param count The maximum size to return
      * @param types The types to use
      * @return
      */
-    public List<UUID> getEdgeMetaData(OrganizationScope scope, Id nodeId, String... types);
+    public List<UUID> getEdgeMetaData(OrganizationScope scope, Id nodeId, UUID start, int count, String... types);
 
     /**
      * Remove the slice from the edge meta data from the types.