You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/29 19:34:02 UTC
[04/20] Implemented New rolling shard algorithm. This should allow
eventual shard consistency between all clients without the need for an
external locking allocation system
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
new file mode 100644
index 0000000..9bd9937
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Class that represents an edge row key
+ */
+public class RowKey {
+ public final Id nodeId;
+ public final String edgeType;
+ public final long shardId;
+
+
+ /**
+ * Create a row key with the node and the edgeType
+ */
+ public RowKey( Id nodeId, String edgeType, final long shardId ) {
+ this.nodeId = nodeId;
+ this.edgeType = edgeType;
+ this.shardId = shardId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
new file mode 100644
index 0000000..6e69bbf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The row key with the additional type
+ */
+public class RowKeyType extends RowKey {
+
+ public final String idType;
+
+ /**
+ * Create a row key with the node id in the row key, the edge type, and the type from the typeid
+ *
+ * @param nodeId The node id in the row key
+ * @param edgeType The type of the edge
+ * @param typeId The type of hte id
+ */
+ public RowKeyType( final Id nodeId, final String edgeType, final Id typeId, final long shardId ) {
+ this( nodeId, edgeType, typeId.getType(), shardId );
+ }
+
+
+ /**
+ * Create a row key with the node id in the row key, the edge type, and the type from the typeid
+ *
+ * @param nodeId The node id in the row key
+ * @param edgeType The type of the edge
+ * @param typeId The type of hte id
+ */
+ public RowKeyType( final Id nodeId, final String edgeType, final String typeId, final long shardId ) {
+ super( nodeId, edgeType, shardId );
+ this.idType = typeId;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
new file mode 100644
index 0000000..38fe51c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+public class Shard implements Comparable<Shard> {
+
+ private final long shardIndex;
+ private final long createdTime;
+ private final boolean compacted;
+
+
+ public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
+ this.shardIndex = shardIndex;
+ this.createdTime = createdTime;
+ this.compacted = compacted;
+ }
+
+
+ /**
+ * Get the long shard index
+ */
+ public long getShardIndex() {
+ return shardIndex;
+ }
+
+
+ /**
+ * Get the timestamp in epoch millis this shard was created
+ */
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+
+ /**
+ * Return true if this shard has been compacted
+ */
+ public boolean isCompacted() {
+ return compacted;
+ }
+
+
+ /**
+ * Compare the shards based on the timestamp first, then the created time second
+ */
+ @Override
+ public int compareTo( final Shard o ) {
+ if ( o == null ) {
+ return 1;
+ }
+
+ if ( shardIndex > o.shardIndex ) {
+ return 1;
+ }
+
+ else if ( shardIndex == o.shardIndex ) {
+ if ( createdTime > o.createdTime ) {
+ return 1;
+ }
+ else if ( createdTime < o.createdTime ) {
+ return -1;
+ }
+
+ else {
+
+ //kind of arbitrary compacted takes precedence
+ if ( compacted && !o.compacted ) {
+ return 1;
+ }
+
+ else if ( !compacted && o.compacted ){
+ return -1;
+ }
+
+
+ }
+ return 0;
+ }
+
+ return -1;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final Shard shard = ( Shard ) o;
+
+ if ( compacted != shard.compacted ) {
+ return false;
+ }
+ if ( createdTime != shard.createdTime ) {
+ return false;
+ }
+ if ( shardIndex != shard.shardIndex ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
+ result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+ result = 31 * result + ( compacted ? 1 : 0 );
+ return result;
+ }
+
+
+ @Override
+ public String toString() {
+ return "Shard{" +
+ "shardIndex=" + shardIndex +
+ ", createdTime=" + createdTime +
+ "} ";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
new file mode 100644
index 0000000..59ea9fb
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * There are cases where we need to read or write to more than 1 shard. This object encapsulates a set of shards that
+ * should be written to and read from. All reads should combine the data sets from all shards in the group, and writes
+ * should be written to each shard. Once the shard can safely be compacted a background process should be triggered to
+ * remove additional shards and make seeks faster. This multiread/write should only occur during the time period of the
+ * delta (in milliseconds), after which the next read will asynchronously compact the shards into a single shard.
+ */
+public class ShardEntryGroup {
+
+
+ private List<Shard> shards;
+
+ private final long delta;
+
+ private long maxCreatedTime;
+
+ private Shard compactionTarget;
+
+
+ /**
+ * The max delta we accept in milliseconds for create time to be considered a member of this group
+ */
+ public ShardEntryGroup( final long delta ) {
+ Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
+ this.delta = delta;
+ this.shards = new ArrayList<>();
+ this.maxCreatedTime = 0;
+ }
+
+
+ /**
+ * Only add a shard if it is within the rules require to meet a group. The rules are outlined below.
+ *
+ * Case 1) First shard in the group, always added
+ *
+ * Case 2) Shard is unmerged, it should be included with it's peers since other nodes may not have it yet
+ *
+ * Case 3) The list contains only non compacted shards, and this is last and and merged. It is considered a lower
+ * bound
+ */
+ public boolean addShard( final Shard shard ) {
+
+ Preconditions.checkNotNull( "shard cannot be null", shard );
+
+ final int size = shards.size();
+
+ if ( size == 0 ) {
+ addShardInternal( shard );
+ return true;
+ }
+
+ final Shard minShard = shards.get( size - 1 );
+
+ Preconditions.checkArgument( minShard.compareTo( shard ) > 0, "shard must be less than the current max" );
+
+ //shard is not compacted, or it's predecessor isn't, we should include it in this group
+ if ( !minShard.isCompacted() ) {
+ addShardInternal( shard );
+ return true;
+ }
+
+
+ return false;
+ }
+
+
+ /**
+ * Add the shard and set the min created time
+ */
+ private void addShardInternal( final Shard shard ) {
+ shards.add( shard );
+
+ maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() );
+
+ //we're changing our structure, unset the compaction target
+ compactionTarget = null;
+ }
+
+
+ /**
+ * Return the minum shard based on time indexes
+ */
+ public Shard getMinShard() {
+ final int size = shards.size();
+
+ if ( size < 1 ) {
+ return null;
+ }
+
+ return shards.get( size - 1 );
+ }
+
+
+ /**
+ * Get the entries that we should read from.
+ */
+ public Collection<Shard> getReadShards() {
+ return shards;
+ }
+
+
+ /**
+ * Get the entries, with the max shard time being first. We write to all shards until they're migrated
+ */
+ public Collection<Shard> getWriteShards( long currentTime ) {
+
+ /**
+ * The shards in this set can be combined, we should only write to the compaction target to avoid
+ * adding data to other shards
+ */
+ if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
+ return Collections.singleton( getCompactionTarget() );
+ }
+
+
+ return shards;
+ }
+
+
+ /**
+ * Return true if we have a pending compaction
+ */
+ public boolean isCompactionPending() {
+ return !isTooSmallToCompact();
+ }
+
+
+ /**
+ * Get the shard all compactions should write to. Null indicates we cannot find a shard that could be used as a
+ * compaction target. Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
+ * first to ensure all criteria are met before initiating compaction
+ */
+ public Shard getCompactionTarget() {
+
+ if ( compactionTarget != null ) {
+ return compactionTarget;
+ }
+
+
+ //we have < 2 shards, we can't compact
+ if ( isTooSmallToCompact() ) {
+ return null;
+ }
+
+
+ final int lastIndex = shards.size() - 1;
+
+ final Shard last = shards.get( lastIndex );
+
+ //Our oldest isn't compacted. As a result we have no "bookend" to delimit this entry group. Therefore we
+ // can't compact
+ if ( !last.isCompacted() ) {
+ return null;
+ }
+
+ //Start seeking from the end of our group. The first shard we encounter that is not compacted is our
+ // compaction target
+ //NOTE: This does not mean we can compact, rather it's just an indication that we have a target set.
+ for ( int i = lastIndex - 1; i > -1; i-- ) {
+ final Shard compactionCandidate = shards.get( i );
+
+
+ if ( !compactionCandidate.isCompacted() ) {
+ compactionTarget = compactionCandidate;
+ break;
+ }
+ }
+
+ return compactionTarget;
+ }
+
+
+ /**
+ * Return the number of entries in this shard group
+ */
+ public int entrySize() {
+ return shards.size();
+ }
+
+
+ /**
+ * Return true if there are not enough elements in this entry group to consider compaction
+ */
+ private boolean isTooSmallToCompact() {
+ return shards.size() < 2;
+ }
+
+
+ /**
+ * Returns true if the newest created shard is path the currentTime - delta
+ *
+ * @param currentTime The current system time in milliseconds
+ *
+ * @return True if these shards can safely be combined into a single shard, false otherwise
+ */
+ public boolean shouldCompact( final long currentTime ) {
+
+ /**
+ * We don't have enough shards to compact, ignore
+ */
+ return getCompactionTarget() != null
+
+
+ /**
+ * If something was created within the delta time frame, not everyone may have seen it due to
+ * cache refresh, we can't compact yet.
+ */
+
+ && currentTime - delta > maxCreatedTime;
+ }
+
+
+ /**
+ * Return true if this shard can be deleted AFTER all of the data in it has been moved
+ */
+ public boolean canBeDeleted( final Shard shard ) {
+ //if we're a neighbor shard (n-1) or the target compaction shard, we can't be deleted
+ //we purposefully use shard index comparison over .equals here, since 2 shards might have the same index with
+ // different timestamps
+ // (unlikely but could happen)
+
+ final Shard compactionTarget = getCompactionTarget();
+
+
+ return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
+ .getShardIndex() );
+ }
+
+
+ /**
+ * Helper method to create a shard entry group with a single shard
+ */
+ public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
+ ShardEntryGroup group = new ShardEntryGroup( delta );
+ group.addShard( shard );
+
+ return group;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
new file mode 100644
index 0000000..13c5596
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import rx.Observable;
+
+
+/**
+ * Defines tasks for running compaction
+ *
+ *
+ */
+public interface ShardGroupCompaction {
+
+
+ /**
+ * Execute the compaction task. Will return the number of edges that have
+ * @param group The shard entry group to compact
+ * @return The number of edges that are now compacted into the target shard
+ */
+ public Observable<Integer> compact(ShardEntryGroup group);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
new file mode 100644
index 0000000..d1ad18d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * Performs serialization on the shards
+ */
+public interface ShardedEdgeSerialization {
+
+ /**
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The org scope of the graph
+ * @param markedEdge The edge to write
+ * @param timestamp The timestamp to use
+ */
+ MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ UUID timestamp );
+
+ /**
+ * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The org scope of the graph
+ * @param markedEdge The edge to write
+ * @param timestamp The timestamp of the uuid
+ */
+ MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ UUID timestamp );
+
+
+ /**
+ * Search for all versions of this edge < the search version. Returns all versions
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByEdge search, Iterator<ShardEntryGroup> shards );
+
+ /**
+ * Get an iterator of all edges by edge type originating from source node
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+ /**
+ * Get an iterator of all edges by edge type originating from source node. Also filters by target node id type
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByIdType search, Iterator<ShardEntryGroup> shards );
+
+ /**
+ * Get an iterator of all edges by edge type pointing to the target node. Returns all versions
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+ /**
+ * Get an iterator of all edges by edge type pointing to the target node. Also uses the source id type to limit the
+ * results
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByIdType search, Iterator<ShardEntryGroup> shards );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 4318200..f5666a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+
/**
* This class is synchronized for addition. It is meant to be used across multiple threads
@@ -35,7 +37,7 @@ public class Counter {
private final AtomicLong invokeCounter;
/**
- * Pointer to our "current" counter map. We flush this when time expires or we hit our count
+ * Pointer to our "current" counter map. We beginFlush this when time expires or we hit our count
*/
private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
@@ -94,6 +96,10 @@ public class Counter {
* @param other
*/
public void merge(final Counter other){
+
+ Preconditions.checkNotNull(other, "other cannot be null");
+ Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
+
for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
add(entry.getKey(), entry.getValue().get());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 7dc763f..47243e5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -20,16 +20,22 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import com.netflix.astyanax.MutationBatch;
import com.netflix.hystrix.HystrixCommand;
@@ -40,10 +46,12 @@ import rx.schedulers.Schedulers;
/**
* Implementation for doing edge approximation based on counters. Uses a guava loading cache to load values from
- * cassandra, and flush them on cache eviction.
+ * cassandra, and beginFlush them on cache eviction.
*/
public class NodeShardApproximationImpl implements NodeShardApproximation {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
+
/**
* Read write locks to ensure we atomically swap correctly
*/
@@ -63,7 +71,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
/**
* The counter that is currently in process of flushing to Cassandra. Can be null
*/
- private volatile Counter flushPending;
+ private final BlockingQueue<Counter> flushQueue;
+
+ private final FlushWorker worker;
/**
@@ -77,15 +87,22 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
this.nodeShardCounterSerialization = nodeShardCounterSerialization;
this.timeService = timeService;
this.currentCounter = new Counter();
+ this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
+
+ this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
+
+ Schedulers.newThread().createWorker().schedule( worker );
+
}
@Override
- public void increment( final ApplicationScope scope, final Id nodeId, final long shardId, final long count,
- final String... edgeType ) {
+ public void increment(
+ final ApplicationScope scope, final Shard shard,
+ final long count, final DirectedEdgeMeta directedEdgeMeta ) {
- final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+ final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
readLock.lock();
@@ -102,10 +119,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId,
- final String... edgeType ) {
+ public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
- final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+ final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
readLock.lock();
@@ -115,9 +131,6 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
try {
count = currentCounter.get( key );
- if ( flushPending != null ) {
- count += flushPending.get( key );
- }
}
finally {
readLock.unlock();
@@ -130,78 +143,121 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public void flush() {
+ public void beginFlush() {
writeLockLock.lock();
try {
- flushPending = currentCounter;
- currentCounter = new Counter();
+
+ final boolean queued = flushQueue.offer( currentCounter );
+
+ /**
+ * We were able to q the beginFlush, swap it
+ */
+ if ( queued ) {
+ currentCounter = new Counter();
+ }
}
finally {
writeLockLock.unlock();
}
+ }
- //copy to the batch outside of the command for performance
- final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
+ @Override
+ public boolean flushPending() {
+ return flushQueue.size() > 0 || worker.isFlushing();
+ }
- /**
- * Execute the command in hystrix to avoid slamming cassandra
- */
- new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
- @Override
- protected Void run() throws Exception {
- /**
- * Execute the batch asynchronously
- */
- batch.execute();
+ /**
+ * Check if we need to beginFlush. If we do, perform the beginFlush
+ */
+ private void checkFlush() {
- return null;
- }
+ //there's no beginFlush pending and we're past the timeout or count
+ if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
+ || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
+ beginFlush();
+ }
+ }
- @Override
- protected Object getFallback() {
- //we've failed to mutate. Merge this count back into the current one
- currentCounter.merge( flushPending );
+ /**
+ * Worker that will take from the queue
+ */
+ private static class FlushWorker implements Action0 {
- return null;
- }
- }.execute();
+ private final BlockingQueue<Counter> counterQueue;
+ private final NodeShardCounterSerialization nodeShardCounterSerialization;
- writeLockLock.lock();
+ private volatile Counter rollUp;
- try {
- flushPending = null;
- }
- finally {
- writeLockLock.unlock();
+
+ private FlushWorker( final BlockingQueue<Counter> counterQueue,
+ final NodeShardCounterSerialization nodeShardCounterSerialization ) {
+ this.counterQueue = counterQueue;
+ this.nodeShardCounterSerialization = nodeShardCounterSerialization;
}
- }
- /**
- * Check if we need to flush. If we do, perform the flush
- */
- private void checkFlush() {
+ @Override
+ public void call() {
- //there's no flush pending and we're past the timeout or count
- if ( flushPending == null && (
- currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
- || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) ) {
+ while ( true ) {
+ /**
+ * Block taking the first element. Once we take this, batch drain and roll up the rest
+ */
+
+ try {
+ rollUp = null;
+ rollUp = counterQueue.take();
+ }
+ catch ( InterruptedException e ) {
+ LOG.error( "Unable to read from counter queue", e );
+ throw new RuntimeException( "Unable to read from counter queue", e );
- /**
- * Fire the flush action asynchronously
- */
- Schedulers.immediate().createWorker().schedule( new Action0() {
- @Override
- public void call() {
- flush();
}
- } );
+
+
+
+
+ //copy to the batch outside of the command for performance
+ final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
+
+ /**
+ * Execute the command in hystrix to avoid slamming cassandra
+ */
+ new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+
+ @Override
+ protected Void run() throws Exception {
+ batch.execute();
+
+ return null;
+ }
+
+
+ @Override
+ protected Object getFallback() {
+ //we've failed to mutate. Merge this count back into the current one
+ counterQueue.offer( rollUp );
+
+ return null;
+ }
+ }.execute();
+ }
+
+ }
+
+
+ /**
+ * Return true if we're in the process of flushing
+ * @return
+ */
+ public boolean isFlushing(){
+ return rollUp != null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
index 41eb525..4b05401 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
@@ -27,7 +27,7 @@ import com.netflix.astyanax.MutationBatch;
/**
* Serialization for flushing and reading counters
*/
-public interface NodeShardCounterSerialization extends Migration {
+public interface NodeShardCounterSerialization extends Migration {
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index da318bf..6b99e93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -29,15 +29,20 @@ import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -48,20 +53,23 @@ import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.serializers.LongSerializer;
-
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.serializers.BooleanSerializer;
@Singleton
public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
+ private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer();
+
/**
* Edge shards
*/
- private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARD_COUNTS =
+ private static final MultiTennantColumnFamily<ApplicationScope, ShardKey, Boolean> EDGE_SHARD_COUNTS =
new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
- new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
+ new OrganizationScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() );
protected final Keyspace keyspace;
@@ -92,12 +100,11 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
final ShardKey key = entry.getKey();
final long value = entry.getValue().get();
- final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
- batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn( key.getShardId(), value );
+ batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value );
}
@@ -108,14 +115,12 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
@Override
public long getCount( final ShardKey key ) {
- final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
-
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
try {
- OperationResult<Column<Long>> column =
- keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( key.getShardId() ).execute();
+ OperationResult<Column<Boolean>> column =
+ keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
return column.getResult().getLongValue();
}
@@ -133,7 +138,50 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
return Collections.singleton(
new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
- ColumnTypes.LONG_TYPE_REVERSED, CounterColumnType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+ ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
}
+
+
+
+ private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> {
+
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+ private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE;
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final ShardKey key ) {
+
+ ID_SER.toComposite( builder, key.scope.getApplication() );
+
+ EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta );
+
+ builder.addLong( key.shard.getShardIndex() );
+
+ builder.addLong( key.shard.getCreatedTime() );
+ }
+
+
+ @Override
+ public ShardKey fromComposite( final CompositeParser composite ) {
+
+ final Id applicationId = ID_SER.fromComposite( composite );
+
+ final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
+
+ final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite );
+
+ final long shardIndex = composite.readLong();
+
+ final long shardCreatedTime = composite.readLong();
+
+ return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta );
+ }
+
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
index 63c87d3..c976210 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
@@ -19,27 +19,24 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-import java.util.Arrays;
-
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
/**
* Key for shards and counts
*/
public class ShardKey {
- private final ApplicationScope scope;
- private final Id nodeId;
- private final long shardId;
- private final String[] edgeTypes;
+ public final ApplicationScope scope;
+ public final Shard shard;
+ public final DirectedEdgeMeta directedEdgeMeta;
- public ShardKey( final ApplicationScope scope, final Id nodeId, final long shardId, final String... edgeTypes ) {
+ public ShardKey( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
this.scope = scope;
- this.nodeId = nodeId;
- this.shardId = shardId;
- this.edgeTypes = edgeTypes;
+ this.shard = shard;
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -54,16 +51,13 @@ public class ShardKey {
final ShardKey shardKey = ( ShardKey ) o;
- if ( shardId != shardKey.shardId ) {
- return false;
- }
- if ( !Arrays.equals( edgeTypes, shardKey.edgeTypes ) ) {
+ if ( !directedEdgeMeta.equals( shardKey.directedEdgeMeta ) ) {
return false;
}
- if ( !nodeId.equals( shardKey.nodeId ) ) {
+ if ( !scope.equals( shardKey.scope ) ) {
return false;
}
- if ( !scope.equals( shardKey.scope ) ) {
+ if ( !shard.equals( shardKey.shard ) ) {
return false;
}
@@ -71,32 +65,11 @@ public class ShardKey {
}
- public ApplicationScope getScope() {
- return scope;
- }
-
-
- public Id getNodeId() {
- return nodeId;
- }
-
-
- public long getShardId() {
- return shardId;
- }
-
-
- public String[] getEdgeTypes() {
- return edgeTypes;
- }
-
-
@Override
public int hashCode() {
int result = scope.hashCode();
- result = 31 * result + nodeId.hashCode();
- result = 31 * result + ( int ) ( shardId ^ ( shardId >>> 32 ) );
- result = 31 * result + Arrays.hashCode( edgeTypes );
+ result = 31 * result + shard.hashCode();
+ result = 31 * result + directedEdgeMeta.hashCode();
return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
deleted file mode 100644
index f4d8467..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Used to store row keys by sourceId, targetId and edgeType
- */
-public class EdgeRowKey {
- public final Id targetId;
- public final long[] edgeTypesHash;
-
-
- public EdgeRowKey( final Id rowId, final String[] edgeTypes ) {
- this( rowId, EdgeHasher.createEdgeHash( edgeTypes ) );
- }
-
-
- public EdgeRowKey( final Id rowId, final long[] hash ) {
- this.targetId = rowId;
- this.edgeTypesHash = hash;
- }
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
index cc615c1..90b264c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
import org.apache.usergrid.persistence.model.entity.Id;
import com.netflix.astyanax.model.CompositeBuilder;
@@ -30,6 +31,7 @@ import com.netflix.astyanax.model.CompositeParser;
/**
* Class to perform serialization for row keys from edges
*/
+
public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
@@ -39,10 +41,10 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
//add the row id to the composite
+ ID_SER.toComposite( builder, key.sourceId );
+ builder.addString( key.edgeType );
ID_SER.toComposite( builder, key.targetId );
-
- builder.addLong( key.edgeTypesHash[0] );
- builder.addLong( key.edgeTypesHash[1] );
+ builder.addLong( key.shardId );
}
@@ -50,9 +52,12 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
public EdgeRowKey fromComposite( final CompositeParser composite ) {
final Id sourceId = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final Id targetId = ID_SER.fromComposite( composite );
+ final long shard = composite.readLong();
- final long[] hash = { composite.readLong(), composite.readLong() };
-
- return new EdgeRowKey( sourceId, hash );
+ return new EdgeRowKey( sourceId, edgeType, targetId, shard );
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
new file mode 100644
index 0000000..b7cc25d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -0,0 +1,143 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Searcher to be used when performing the search. Performs I/O transformation as well as parsing for the iterator. If
+ * there are more row keys available to seek, the iterator will return true
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
+ Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
+
+ protected final Optional<Edge> last;
+ protected final long maxTimestamp;
+ protected final ApplicationScope scope;
+ protected final Iterator<ShardEntryGroup> shards;
+
+
+ protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+ final Iterator<ShardEntryGroup> shards ) {
+
+ Preconditions.checkArgument(shards.hasNext(), "Cannot search with no possible shards");
+
+ this.scope = scope;
+ this.maxTimestamp = maxTimestamp;
+ this.last = last;
+ this.shards = shards;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ return shards.hasNext();
+ }
+
+
+ @Override
+ public List<ScopedRowKey<ApplicationScope, R>> next() {
+ Collection<Shard> readShards = shards.next().getReadShards();
+
+ List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
+
+ for(Shard shard : readShards){
+
+ final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
+ .fromKey( scope, generateRowKey(shard.getShardIndex() ) );
+
+ rowKeys.add( rowKey );
+ }
+
+
+ return rowKeys;
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Set the range on a search
+ */
+ public void setRange( final RangeBuilder builder ) {
+
+ //set our start range since it was supplied to us
+ if ( last.isPresent() ) {
+ C sourceEdge = getStartColumn( last.get() );
+
+
+ builder.setStart( sourceEdge, getSerializer() );
+ }
+ else {
+
+
+ }
+ }
+
+
+ public boolean hasPage() {
+ return last.isPresent();
+ }
+
+
+ @Override
+ public T parseColumn( final Column<C> column ) {
+ final C edge = column.getName();
+
+ return createEdge( edge, column.getBooleanValue() );
+ }
+
+
+ /**
+ * Get the column's serializer
+ */
+ protected abstract Serializer<C> getSerializer();
+
+
+ /**
+ * Create a row key for this search to use
+ *
+ * @param shard The shard to use in the row key
+ */
+ protected abstract R generateRowKey( final long shard );
+
+
+ /**
+ * Set the start column to begin searching from. The last is provided
+ */
+ protected abstract C getStartColumn( final Edge last );
+
+
+ /**
+ * Create an edge to return to the user based on the directed edge provided
+ *
+ * @param column The column name
+ * @param marked The marked flag in the column value
+ */
+ protected abstract T createEdge( final C column, final boolean marked );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
new file mode 100644
index 0000000..d93f679
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+/**
+ * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target
+ * Id and version.
+ */
+public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
+
+ private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+ private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+
+ @Override
+ public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
+
+ DynamicComposite composite = new DynamicComposite();
+
+ composite.addComponent( edge.timestamp, LONG_SERIALIZER );
+
+ ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+ return composite.serialize();
+ }
+
+
+ @Override
+ public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
+ DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+ Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
+
+
+ //return the version
+ final long timestamp = composite.get( 0, LONG_SERIALIZER );
+
+
+ //parse our id
+ final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
+
+
+ return new DirectedEdge( id, timestamp );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
new file mode 100644
index 0000000..0451d68
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class EdgeShardRowKeySerializer implements CompositeFieldSerializer<DirectedEdgeMeta> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+ public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) {
+
+
+ final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes();
+
+ //add the stored value
+ builder.addInteger( meta.getType().getStorageValue() );
+
+ final int length = nodeMeta.length;
+
+ builder.addInteger( length );
+
+
+ for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) {
+ ID_SER.toComposite( builder, node.getId() );
+ builder.addInteger( node.getNodeType().getStorageValue() );
+ }
+
+ final String[] edgeTypes = meta.getTypes();
+
+ builder.addInteger( edgeTypes.length );
+
+ for ( String type : edgeTypes ) {
+ builder.addString( type );
+ }
+ }
+
+
+ @Override
+ public DirectedEdgeMeta fromComposite( final CompositeParser composite ) {
+
+
+ final int storageType = composite.readInteger();
+
+ final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType );
+
+ final int idLength = composite.readInteger();
+
+ final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength];
+
+
+ for ( int i = 0; i < idLength; i++ ) {
+ final Id sourceId = ID_SER.fromComposite( composite );
+
+ final NodeType type = NodeType.get( composite.readInteger() );
+
+ nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type );
+ }
+
+
+ final int length = composite.readInteger();
+
+ String[] types = new String[length];
+
+ for ( int i = 0; i < length; i++ ) {
+ types[i] = composite.readString();
+ }
+
+ return DirectedEdgeMeta.fromStorage( metaType, nodePairs, types );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72684dfc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index a08ec3b..8233d0d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -26,19 +26,21 @@ import java.util.Iterator;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -58,14 +60,12 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
/**
* Edge shards
*/
- private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARDS =
+ private static final MultiTennantColumnFamily<ApplicationScope, DirectedEdgeMeta, Long> EDGE_SHARDS =
new MultiTennantColumnFamily<>( "Edge_Shards",
- new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
-
+ new OrganizationScopedRowKeySerializer<>( EdgeShardRowKeySerializer.INSTANCE ), LongSerializer.get() );
- private static final byte HOLDER = 0x00;
- private static final LongColumnParser COLUMN_PARSER = new LongColumnParser();
+ private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
protected final Keyspace keyspace;
@@ -83,30 +83,39 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
@Override
- public MutationBatch writeEdgeMeta( final ApplicationScope scope, final Id nodeId, final long shard,
- final String... types ) {
-
+ public MutationBatch writeShardMeta( final ApplicationScope scope,
+ final Shard shard, final DirectedEdgeMeta metaData) {
ValidationUtils.validateApplicationScope( scope );
- ValidationUtils.verifyIdentity(nodeId);
- Preconditions.checkArgument( shard > -1, "shardId must be greater than -1" );
- Preconditions.checkNotNull( types );
+ GraphValidation.validateDirectedEdgeMeta( metaData );
+
+ Preconditions.checkNotNull( metaData, "metadata must be present" );
- final EdgeRowKey key = new EdgeRowKey( nodeId, types );
+ Preconditions.checkNotNull( shard );
+ Preconditions.checkArgument( shard.getShardIndex() > -1, "shardid must be greater than -1" );
+ Preconditions.checkArgument( shard.getCreatedTime() > -1, "createdTime must be greater than -1" );
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, metaData );
final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow( EDGE_SHARDS, rowKey ).putColumn( shard, HOLDER );
+ batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
+ .putColumn( shard.getShardIndex(), shard.isCompacted() );
return batch;
}
@Override
- public Iterator<Long> getEdgeMetaData( final ApplicationScope scope, final Id nodeId, final Optional<Long> start,
- final String... types ) {
+ public Iterator<Shard> getShardMetaData( final ApplicationScope scope,
+ final Optional<Shard> start, final DirectedEdgeMeta metaData ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateDirectedEdgeMeta( metaData );
+
+
+ Preconditions.checkNotNull( metaData, "metadata must be present" );
+
/**
* If the edge is present, we need to being seeking from this
*/
@@ -114,15 +123,16 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( graphFig.getScanPageSize() );
if ( start.isPresent() ) {
- rangeBuilder.setStart( start.get() );
+ final Shard shard = start.get();
+ GraphValidation.valiateShard( shard );
+ rangeBuilder.setStart( shard.getShardIndex() );
}
- final EdgeRowKey key = new EdgeRowKey( nodeId, types );
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, metaData );
- final RowQuery<ScopedRowKey<ApplicationScope, EdgeRowKey>, Long> query =
+ final RowQuery<ScopedRowKey<ApplicationScope, DirectedEdgeMeta>, Long> query =
keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey )
.autoPaginate( true ).withColumnRange( rangeBuilder.build() );
@@ -132,21 +142,20 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
@Override
- public MutationBatch removeEdgeMeta( final ApplicationScope scope, final Id nodeId, final long shard,
- final String... types ) {
+ public MutationBatch removeShardMeta( final ApplicationScope scope,
+ final Shard shard, final DirectedEdgeMeta metaData) {
ValidationUtils.validateApplicationScope( scope );
- ValidationUtils.verifyIdentity(nodeId);
- Preconditions.checkArgument( shard > -1, "shard must be greater than -1" );
- Preconditions.checkNotNull( types );
+ GraphValidation.valiateShard( shard );
+ GraphValidation.validateDirectedEdgeMeta( metaData );
+
- final EdgeRowKey key = new EdgeRowKey( nodeId, types );
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, metaData );
final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard );
+ batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() );
return batch;
}
@@ -163,11 +172,15 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
}
- private static class LongColumnParser implements ColumnParser<Long, Long> {
+
+
+
+
+ private static class ShardColumnParser implements ColumnParser<Long, Shard> {
@Override
- public Long parseColumn( final Column<Long> column ) {
- return column.getName();
+ public Shard parseColumn( final Column<Long> column ) {
+ return new Shard( column.getName(), column.getTimestamp(), column.getBooleanValue() );
}
}
}