You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/12 19:00:47 UTC
[6/9] git commit: Adding utilities. Need to fix bad multiget
semantics for a merged iterator
Adding utilities. Need to fix bad multiget semantics for a merged iterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aafb63c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aafb63c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aafb63c8
Branch: refs/heads/USERGRID-188
Commit: aafb63c83f8d540b40412a4e25efbd5131cc9dda
Parents: 8786460
Author: Todd Nine <to...@apache.org>
Authored: Thu Aug 7 16:35:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Aug 7 16:35:07 2014 -0600
----------------------------------------------------------------------
.../astyanax/MultiKeyColumnNameIterator.java | 152 +++++++++++++++++++
.../impl/shard/NodeShardCache.java | 3 +-
.../impl/shard/impl/EdgeSearcher.java | 29 +++-
.../shard/impl/NodeShardAllocationImpl.java | 18 ---
.../impl/shard/impl/NodeShardCacheImpl.java | 1 -
.../impl/shard/impl/ShardRowIterator.java | 18 ++-
6 files changed, 189 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
new file mode 100644
index 0000000..dede4ab
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Observable;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+
+
+/**
+ * Simple iterator that wraps a collection of ColumnNameIterators. We do this because we can't page with a multiRangeScan
+ * correctly for multiple trips. As a result, we do this since only 1 iterator with minimum values could potentially
+ * feed the entire result set.
+ *
+ * Compares the parsed values and puts them in order. If more than one row key emits the same value
+ * the first value is selected, and ignored from subsequent iterators.
+ *
+ */
+public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
+
+
+ private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
+
+
+ private final Comparator<T> comparator;
+
+
+ public MultiKeyColumnNameIterator(Collection<ColumnNameIterator<C, T>> columnNameIterators, final Comparator<T> comparator) {
+
+ this.comparator = comparator;
+
+
+ advanceIterator();
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return this;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ //if we've exhausted this iterator, try to advance to the next set
+ if ( sourceIterator.hasNext() ) {
+ return true;
+ }
+
+ //advance the iterator, to the next page, there could be more
+ advanceIterator();
+
+ return sourceIterator.hasNext();
+ }
+
+
+ @Override
+ public T next() {
+
+ if ( !hasNext() ) {
+ throw new NoSuchElementException();
+ }
+
+ return parser.parseColumn( sourceIterator.next() );
+ }
+
+
+ @Override
+ public void remove() {
+ sourceIterator.remove();
+ }
+
+
+ /**
+ * Execute the query again and set the reuslts
+ */
+ private void advanceIterator() {
+
+ //run producing the values within a hystrix command. This way we'll time out if the read takes too long
+ try {
+ sourceIterator = rowQuery.execute().getResult().iterator();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to get next page", e );
+ }
+ }
+
+
+ private final class MultiColumIteratorSet{
+
+ private TreeMap<T, ColumnNameIterator<C, T>> sources;
+
+
+
+
+ public boolean isInitialized(){
+ return sources != null;
+ }
+
+
+
+ public boolean hasNext(){
+ return sources == null && sources.size() > 0;
+ }
+
+
+ public void initialize(Collection<ColumnNameIterator<C, T>> columnNameIterators){
+
+
+
+ for (ColumnNameIterator)
+
+ }
+
+
+
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index b39e7e5..561706a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -44,7 +44,8 @@ public interface NodeShardCache {
final String... edgeType );
/**
- * Get an iterator of all versions <= the version for iterating shard entry sets
+ * Get an iterator of all versions <= the version for iterating shard entry sets. The iterator of groups will be ordered
+ * highest to lowest. I.E range scanning from Long.MAX_VALUE to 0
* @param scope
* @param nodeId
* @para nodeType
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 8b39115..b9b2477 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -1,12 +1,17 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import com.google.common.base.Optional;
@@ -23,7 +28,8 @@ import com.netflix.astyanax.util.RangeBuilder;
* @param <C> The column type
* @param <T> The parsed return type
*/
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Iterator<ScopedRowKey<ApplicationScope, R>> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
+ Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
protected final Optional<Edge> last;
protected final long maxTimestamp;
@@ -47,12 +53,21 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Itera
@Override
- public ScopedRowKey<ApplicationScope, R> next() {
- /**
- * TODO Shard fix this
- */
- return ScopedRowKey
- .fromKey( scope, generateRowKey( shards.next().getCompactionTarget().getShardIndex() ) );
+ public List<ScopedRowKey<ApplicationScope, R>> next() {
+ Collection<Shard> readShards = shards.next().getReadShards();
+
+ List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
+
+ for(Shard shard : shards.next().getReadShards()){
+
+ final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
+ .fromKey( scope, generateRowKey(shard.getShardIndex() ) );
+
+ rowKeys.add( rowKey );
+ }
+
+
+ return rowKeys;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 286ed67..4052d8f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -251,22 +251,4 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return timeService.getCurrentTime() - minDelta;
}
-
-
- /**
- * Sorts by minimum time first. If 2 times are equal, the min shard value is taken
- */
- private static final class MinShardTimeComparator implements Comparator<Shard> {
-
- @Override
- public int compare( final Shard s1, final Shard s2 ) {
- int result = Long.compare( s1.getCreatedTime(), s2.getCreatedTime() );
-
- if ( result == 0 ) {
- result = Long.compare( s1.getShardIndex(), s2.getShardIndex() );
- }
-
- return result;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 9fcd7d6..a4867f9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -23,7 +23,6 @@ import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aafb63c8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
index 1da85e1..d33f235 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
@@ -2,9 +2,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -12,6 +14,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.query.RowSliceQuery;
import com.netflix.astyanax.util.RangeBuilder;
@@ -99,14 +102,19 @@ public class ShardRowIterator<R, C, T> implements Iterator<T> {
//set the range into the search
searcher.setRange( rangeBuilder );
- final ScopedRowKey<ApplicationScope, R> rowKey = searcher.next();
+ /**
+ * Get our list of slices
+ */
+ final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
+
+
+ RowSliceQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
+ .withColumnRange( rangeBuilder.build() );
- RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
- keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
- .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+ currentColumnIterator = new MultiKeyColumnNameIterator<>(query, searcher, searcher, searcher.hasNext() );
- currentColumnIterator = new ColumnNameIterator<C, T>( query, searcher, searcher.hasPage() );
}
}