You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/10 19:37:19 UTC
[1/5] incubator-usergrid git commit: change timer for buffer consumer
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o d901d38ca -> 06e7ad6c0
change timer for buffer consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5cc521f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5cc521f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5cc521f4
Branch: refs/heads/two-dot-o
Commit: 5cc521f472699633a8a8cb3006c79ab5da15f632
Parents: d901d38
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 12:46:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 12:46:48 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5cc521f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index d952d78..efde88f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -88,18 +88,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
do {
try {
- Timer.Context timer = produceTimer.time();
IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
if(polled!=null) {
+ Timer.Context timer = produceTimer.time();
drainList.add(polled);
producerQueue.drainTo(drainList, config.getIndexBufferSize());
for(IndexOperationMessage drained : drainList){
subscriber.onNext(drained);
}
drainList.clear();
+ timer.stop();
}
- timer.stop();
-
} catch (InterruptedException ie) {
log.error("failed to dequeue", ie);
}
[5/5] incubator-usergrid git commit: Fixes ttl on map entry keys
Posted by gr...@apache.org.
Fixes ttl on map entry keys
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/06e7ad6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/06e7ad6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/06e7ad6c
Branch: refs/heads/two-dot-o
Commit: 06e7ad6c059ed295238c24a2610e2047bf79c0d4
Parents: 5d95ebb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 12:19:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 12:19:23 2015 -0600
----------------------------------------------------------------------
.../map/impl/MapSerializationImpl.java | 37 +++++++++++++++-----
1 file changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/06e7ad6c/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index d3bd3c5..715c202 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -129,10 +129,15 @@ public class MapSerializationImpl implements MapSerialization {
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
- public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
- final ColumnListMutation<Boolean> columnListMutation ) {
+ public void putValue(final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value );
}
+
+
+ @Override
+ public void putKey(final ColumnListMutation<String> keysMutation ) {
+ keysMutation.putColumn( key, true );
+ }
};
@@ -146,10 +151,15 @@ public class MapSerializationImpl implements MapSerialization {
final RowOp op = new RowOp() {
@Override
- public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
- final ColumnListMutation<Boolean> columnListMutation ) {
+ public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value, ttl );
}
+
+
+ @Override
+ public void putKey( final ColumnListMutation<String> keysMutation ) {
+ keysMutation.putColumn( key, true, ttl );
+ }
};
@@ -179,7 +189,7 @@ public class MapSerializationImpl implements MapSerialization {
// entry
- rowOp.rowOp( entryRowKey, batch.withRow( MAP_ENTRIES, entryRowKey ) );
+ rowOp.putValue( batch.withRow( MAP_ENTRIES, entryRowKey ) );
//add it to the keys
@@ -189,20 +199,31 @@ public class MapSerializationImpl implements MapSerialization {
final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
+
+ rowOp.putKey( batch.withRow( MAP_KEYS, keyRowKey ) );
executeBatch( batch );
}
+
+ /**
+ * Callbacks for performing row operations
+ */
private static interface RowOp{
/**
* Callback to do the row
- * @param scopedRowKey The row key
* @param columnListMutation The column mutation
*/
- void rowOp(final ScopedRowKey<MapEntryKey> scopedRowKey, final ColumnListMutation<Boolean> columnListMutation);
+ void putValue( final ColumnListMutation<Boolean> columnListMutation );
+
+
+ /**
+ * Write the key
+ * @param keysMutation
+ */
+ void putKey( final ColumnListMutation<String> keysMutation );
}
[3/5] incubator-usergrid git commit: Added a timeout to the map module
Posted by gr...@apache.org.
Added a timeout to the map module
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e27f0906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e27f0906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e27f0906
Branch: refs/heads/two-dot-o
Commit: e27f090656f43203941475db6072e4e03ac62ae3
Parents: 0555622
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 9 15:22:11 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 9 15:22:11 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/map/MapManager.java | 8 ++
.../persistence/map/impl/MapManagerImpl.java | 6 ++
.../persistence/map/impl/MapSerialization.java | 20 +++--
.../map/impl/MapSerializationImpl.java | 85 ++++++++++++++++----
.../persistence/map/MapManagerTest.java | 49 ++++++++++-
5 files changed, 142 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e27f0906/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index e6d8125..62fe57d 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -38,6 +38,14 @@ public interface MapManager {
*/
public void putString( final String key, final String value );
+ /**
+ * The time to live (in seconds) of the string
+ * @param key
+ * @param value
+ * @param ttl
+ */
+ public void putString( final String key, final String value, final int ttl );
+
/**
* Return the uuid, null if not found
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e27f0906/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index e582e55..c077c7d 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -57,6 +57,12 @@ public class MapManagerImpl implements MapManager {
@Override
+ public void putString( final String key, final String value, final int ttl ) {
+ mapSerialization.putString( scope, key, value, ttl );
+ }
+
+
+ @Override
public UUID getUuid( final String key ) {
return mapSerialization.getUuid(scope,key);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e27f0906/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 33f2043..6e7e328 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -18,6 +18,8 @@
*/
package org.apache.usergrid.persistence.map.impl;
+
+
import java.util.UUID;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
@@ -33,32 +35,38 @@ public interface MapSerialization extends Migration {
/**
* Return the string, null if not found
*/
- public void putString(final MapScope scope, final String key, final String value );
+ public void putString( final MapScope scope, final String key, final String value );
+
+ /**
+ * Write the string
+ */
+ public void putString( final MapScope scope, final String key, final String value, final int ttl );
/**
* Return the uuid, null if not found
*/
- public UUID getUuid(final MapScope scope, final String key );
+ public UUID getUuid( final MapScope scope, final String key );
/**
* Return the uuid, null if not found
*/
- public void putUuid(final MapScope scope, final String key, final UUID putUuid );
+ public void putUuid( final MapScope scope, final String key, final UUID putUuid );
/**
* Return the long, null if not found
*/
- public Long getLong(final MapScope scope, final String key );
+ public Long getLong( final MapScope scope, final String key );
/**
* Return the long, null if not found
*/
- public void putLong(final MapScope scope, final String key, final Long value );
+ public void putLong( final MapScope scope, final String key, final Long value );
/**
* Delete the key
*
* @param key The key used to delete the entry
*/
- public void delete(final MapScope scope, final String key );}
+ public void delete( final MapScope scope, final String key );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e27f0906/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 1b38d3c..d3bd3c5 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -42,6 +42,7 @@ import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -126,34 +127,86 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public void putString( final MapScope scope, final String key, final String value ) {
- Preconditions.checkNotNull(scope, "mapscope is required");
+ final RowOp op = new RowOp() {
+ @Override
+ public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
+ final ColumnListMutation<Boolean> columnListMutation ) {
+ columnListMutation.putColumn( true, value );
+ }
+ };
+
+
+ writeString( scope, key, value, op );
+ }
+
+
+ @Override
+ public void putString( final MapScope scope, final String key, final String value, final int ttl ) {
+ Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" );
+
+ final RowOp op = new RowOp() {
+ @Override
+ public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
+ final ColumnListMutation<Boolean> columnListMutation ) {
+ columnListMutation.putColumn( true, value, ttl );
+ }
+ };
+
+
+ writeString( scope, key, value, op );
+ }
+
+
+ /**
+ * Write our string index with the specified row op
+ * @param scope
+ * @param key
+ * @param value
+ * @param rowOp
+ */
+ private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
+
+ Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+
+ //serialize to the
+ // entry
+
+
+ rowOp.rowOp( entryRowKey, batch.withRow( MAP_ENTRIES, entryRowKey ) );
- //serialize to the entry
- batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value);
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
- final BucketScopedRowKey< String> keyRowKey =
- BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+ final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
- batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
+ batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
- executeBatch(batch);
+
+ executeBatch( batch );
}
+ private static interface RowOp{
+ /**
+ * Callback to do the row
+ * @param scopedRowKey The row key
+ * @param columnListMutation The column mutation
+ */
+ void rowOp(final ScopedRowKey<MapEntryKey> scopedRowKey, final ColumnListMutation<Boolean> columnListMutation);
+ }
+
@Override
public UUID getUuid( final MapScope scope, final String key ) {
@@ -255,18 +308,18 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
- final MultiTennantColumnFamilyDefinition mapEntries =
+ final MultiTennantColumnFamilyDefinition mapEntries =
new MultiTennantColumnFamilyDefinition( MAP_ENTRIES,
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
+ BytesType.class.getSimpleName(),
+ BytesType.class.getSimpleName(),
+ BytesType.class.getSimpleName(),
MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
- final MultiTennantColumnFamilyDefinition mapKeys =
+ final MultiTennantColumnFamilyDefinition mapKeys =
new MultiTennantColumnFamilyDefinition( MAP_KEYS,
- BytesType.class.getSimpleName(),
- UTF8Type.class.getSimpleName(),
- BytesType.class.getSimpleName(),
+ BytesType.class.getSimpleName(),
+ UTF8Type.class.getSimpleName(),
+ BytesType.class.getSimpleName(),
MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
return Arrays.asList( mapEntries, mapKeys );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e27f0906/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index 28000fe..df4394e 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -20,7 +20,9 @@
package org.apache.usergrid.persistence.map;
-import org.apache.usergrid.persistence.core.test.UseModules;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -28,14 +30,13 @@ import org.junit.runner.RunWith;
import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.map.guice.TestMapModule;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.google.inject.Inject;
-import java.util.UUID;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -76,6 +77,40 @@ public class MapManagerTest {
assertEquals( value, returned );
}
+
+ @Test
+ public void writeReadStringTTL() throws InterruptedException {
+
+ MapManager mm = mmf.createMapManager( this.scope );
+
+ final String key = "key";
+ final String value = "value";
+ final int ttl = 5;
+
+
+ mm.putString( key, value, ttl );
+
+ final long startTime = System.currentTimeMillis();
+
+ final String returned = mm.getString( key );
+
+ assertEquals( value, returned );
+
+ final long endTime = startTime + TimeUnit.SECONDS.toMillis( ttl + 1 );
+
+ final long remaining = endTime - System.currentTimeMillis();
+
+ //now sleep and assert it gets removed
+ Thread.sleep( remaining );
+
+ //now read it should be gone
+ final String timedOut = mm.getString( key );
+
+ assertNull("Value was not returned", timedOut);
+
+ }
+
+
@Test
public void writeReadUUID() {
MapManager mm = mmf.createMapManager( this.scope );
@@ -90,6 +125,7 @@ public class MapManagerTest {
assertEquals( value, returned );
}
+
@Test
public void writeReadLong() {
MapManager mm = mmf.createMapManager( this.scope );
@@ -143,6 +179,7 @@ public class MapManagerTest {
assertNull( postDelete );
}
+
@Test
public void deleteUUID() {
MapManager mm = mmf.createMapManager( this.scope );
@@ -163,6 +200,7 @@ public class MapManagerTest {
assertNull( postDelete );
}
+
@Test
public void deleteLong() {
MapManager mm = mmf.createMapManager( this.scope );
@@ -191,14 +229,17 @@ public class MapManagerTest {
mm.putString( null, null );
}
+
@Test( expected = NullPointerException.class )
public void nullInputLong() {
MapManager mm = mmf.createMapManager( this.scope );
mm.putLong( null, null );
}
+
+
@Test( expected = NullPointerException.class )
- public void nullInputUUID() {
+ public void nullInputUUID() {
MapManager mm = mmf.createMapManager( this.scope );
mm.putUuid( null, null );
[4/5] incubator-usergrid git commit: Addes Cache to
MapMangerFactorImpl for efficiency
Posted by gr...@apache.org.
Addes Cache to MapMangerFactorImpl for efficiency
Adds timeout for string storage in our map
Adds time UUID generated cursors to avoid USERGRID-461. Note that the map CF will need to be tuned appropriately. I.E lower gc_grace times since columns are expire and are never deleted. SSD's with key cache, or row+ key cache for spinning disks at the physical tier.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5d95ebba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5d95ebba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5d95ebba
Branch: refs/heads/two-dot-o
Commit: 5d95ebbadc37609c200ea8def435b945c9641bac
Parents: e27f090
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 9 16:14:21 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 9 16:14:21 2015 -0600
----------------------------------------------------------------------
.../persistence/map/MapManagerFactory.java | 2 +
.../persistence/map/guice/MapModule.java | 6 +-
.../map/impl/MapManagerFactoryImpl.java | 67 ++++++++++
stack/corepersistence/queryindex/pom.xml | 16 ++-
.../persistence/index/guice/IndexModule.java | 4 +
.../index/impl/EsEntityIndexFactoryImpl.java | 12 +-
.../index/impl/EsEntityIndexImpl.java | 57 +++++++--
.../persistence/index/impl/IndexingUtils.java | 2 +-
.../persistence/index/utils/StringUtils.java | 126 ++-----------------
.../persistence/index/utils/UUIDUtils.java | 4 -
.../persistence/index/impl/EntityIndexTest.java | 99 ++++++++++++++-
11 files changed, 247 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
index a60cdfc..81531c9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
@@ -27,4 +27,6 @@ public interface MapManagerFactory {
* Get the map manager
*/
public MapManager createMapManager( final MapScope scope );
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
index d167151..5c46c87 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.map.guice;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.impl.MapManagerFactoryImpl;
import org.apache.usergrid.persistence.map.impl.MapManagerImpl;
import org.apache.usergrid.persistence.map.impl.MapSerialization;
import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
@@ -43,12 +44,11 @@ public class MapModule extends AbstractModule {
protected void configure() {
// create a guice factory for getting our collection manager
- install( new FactoryModuleBuilder().implement( MapManager.class, MapManagerImpl.class )
- .build( MapManagerFactory.class ) );
-
+ bind(MapManagerFactory.class).to( MapManagerFactoryImpl.class );
bind( MapSerialization.class).to( MapSerializationImpl.class );
+
Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
migrationBinding.addBinding().to( Key.get( MapSerialization.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
new file mode 100644
index 0000000..e69a02e
--- /dev/null
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.map.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Execution;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Returns map managers, built to handle caching
+ */
+@Singleton
+public class MapManagerFactoryImpl implements MapManagerFactory {
+ private final MapSerialization mapSerialization;
+ private LoadingCache<MapScope, MapManager> mmCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<MapScope, MapManager>() {
+ public MapManager load( MapScope scope ) {
+ return new MapManagerImpl(scope,mapSerialization);
+ }
+ } );
+
+ @Inject
+ public MapManagerFactoryImpl(final MapSerialization mapSerialization){
+
+ this.mapSerialization = mapSerialization;
+ }
+
+ @Override
+ public MapManager createMapManager(MapScope scope) {
+ Preconditions.checkNotNull(scope);
+ try{
+ return mmCache.get(scope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ mmCache.invalidateAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index 8604316..f6ae718 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -50,10 +50,10 @@
<artifactId>chop-maven-plugin</artifactId>
<version>${chop.version}</version>
-
+
NOTE: you should be putting most of these variables into your settings.xml
as an automatically activated profile.
-
+
<configuration>
<accessKey>${aws.s3.key}</accessKey>
@@ -74,11 +74,11 @@
<runnerKeyPairName>${runner.keypair.name}</runnerKeyPairName>
<runnerCount>6</runnerCount>
<securityGroupExceptions>
-
+
Add your own IP address as an exception to allow access
but please do this in the settings.xml file .. essentially
all parameters should be in the settings.xml file.
-
+
<param>${myip.address}/32:24981</param>
<param>${myip.address}/32:22</param>
</securityGroupExceptions>
@@ -99,6 +99,14 @@
<type>jar</type>
</dependency>
+
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>map</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ </dependency>
+
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 6fee17e..ebd9098 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.index.impl.EsEntityIndexFactoryImpl;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
+import org.apache.usergrid.persistence.map.guice.MapModule;
+
import org.safehaus.guicyfig.GuicyFigModule;
@@ -38,6 +40,8 @@ public class IndexModule extends AbstractModule {
// install our configuration
install(new GuicyFigModule(IndexFig.class));
+ install(new MapModule());
+
bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 3a9f790..8af309d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -23,7 +23,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -31,6 +30,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexBufferProducer;
import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
import java.util.concurrent.ExecutionException;
@@ -44,23 +44,29 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
private final EsIndexCache indexCache;
private final IndexBufferProducer indexBatchBufferProducer;
private final MetricsFactory metricsFactory;
+ private final MapManagerFactory mapManagerFactory;
+ private final IndexFig indexFig;
private LoadingCache<ApplicationScope, EntityIndex> eiCache =
CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, EntityIndex>() {
public EntityIndex load( ApplicationScope scope ) {
- return new EsEntityIndexImpl(scope,config, indexBatchBufferProducer, provider,indexCache, metricsFactory);
+ return new EsEntityIndexImpl(scope,config, indexBatchBufferProducer, provider,indexCache, metricsFactory,
+ mapManagerFactory, indexFig );
}
} );
@Inject
public EsEntityIndexFactoryImpl( final IndexFig config, final EsProvider provider, final EsIndexCache indexCache,
final IndexBufferProducer indexBatchBufferProducer,
- final MetricsFactory metricsFactory ){
+ final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory,
+ final IndexFig indexFig ){
this.config = config;
this.provider = provider;
this.indexCache = indexCache;
this.indexBatchBufferProducer = indexBatchBufferProducer;
this.metricsFactory = metricsFactory;
+ this.mapManagerFactory = mapManagerFactory;
+ this.indexFig = indexFig;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index c9f5590..c92b299 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -38,6 +39,11 @@ import org.apache.usergrid.persistence.index.exceptions.IndexException;
import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -81,6 +87,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.*;
@@ -99,6 +106,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
private final IndexBufferProducer indexBatchBufferProducer;
+ private final IndexFig indexFig;
private final Timer addTimer;
private final Timer addWriteAliasTimer;
private final Timer addReadAliasTimer;
@@ -141,14 +149,18 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
private Timer allVersionsTimer;
private Timer deletePreviousTimer;
+ private final MapManager mapManager;
+
// private final Timer indexTimer;
@Inject
public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider,
- final EsIndexCache indexCache, final MetricsFactory metricsFactory) {
+ final EsIndexCache indexCache, final MetricsFactory metricsFactory,
+ final MapManagerFactory mapManagerFactory, final IndexFig indexFig ) {
this.indexBatchBufferProducer = indexBatchBufferProducer;
+ this.indexFig = indexFig;
ValidationUtils.validateApplicationScope( appScope );
this.applicationScope = appScope;
this.esProvider = provider;
@@ -181,6 +193,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
.getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer" );
this.deletePreviousTimer = metricsFactory
.getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer" );
+
+ final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );
+
+ mapManager = mapManagerFactory.createMapManager( mapScope );
}
@Override
@@ -467,17 +483,26 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
failureMonitor.success();
}
else {
- String scrollId = query.getCursor();
- if ( scrollId.startsWith( "\"" ) ) {
- scrollId = scrollId.substring( 1 );
+ String userCursorString = query.getCursor();
+ if ( userCursorString.startsWith( "\"" ) ) {
+ userCursorString = userCursorString.substring( 1 );
}
- if ( scrollId.endsWith( "\"" ) ) {
- scrollId = scrollId.substring( 0, scrollId.length() - 1 );
+ if ( userCursorString.endsWith( "\"" ) ) {
+ userCursorString = userCursorString.substring( 0, userCursorString.length() - 1 );
}
- logger.debug( "Executing query with cursor: {} ", scrollId );
+
+ //now get the cursor from the map and validate
+ final String esScrollCursor = mapManager.getString( userCursorString );
+
+ Preconditions.checkArgument(esScrollCursor != null, "Could not find a cursor for the value '{}' ", esScrollCursor);
+
+
+
+ logger.debug( "Executing query with cursor: {} ", esScrollCursor );
+
SearchScrollRequestBuilder ssrb = esProvider.getClient()
- .prepareSearchScroll(scrollId).setScroll( cursorTimeout + "m" );
+ .prepareSearchScroll(esScrollCursor).setScroll( cursorTimeout + "m" );
try {
//Added For Graphite Metrics
@@ -524,8 +549,20 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
CandidateResults candidateResults = new CandidateResults( query, candidates );
if ( candidates.size() >= query.getLimit() ) {
- candidateResults.setCursor( searchResponse.getScrollId() );
- logger.debug(" Cursor = " + searchResponse.getScrollId());
+ //USERGRID-461 our cursor is getting too large, map it to a new time UUID
+
+ final String userCursorString = org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+ final String esScrollCursor = searchResponse.getScrollId();
+
+ //now set this into our map module
+ final int minutes = indexFig.getQueryCursorTimeout();
+
+ //just truncate it, we'll never hit a long value anyway
+ mapManager.putString( userCursorString, esScrollCursor, ( int ) TimeUnit.MINUTES.toSeconds( minutes ) );
+
+ candidateResults.setCursor( userCursorString );
+ logger.debug(" User cursor = {}, Cursor = {} ", userCursorString, esScrollCursor);
}
return candidateResults;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index be0c96c..ffd98e9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -152,7 +152,7 @@ public class IndexingUtils {
.startObject()
/** add routing "_routing":{ "required":false, "path":"ug_entityId" **/
- //.startObject("_routing").field("required",true).field("path",ENTITYID_ID_FIELDNAME).endObject()
+ .startObject("_routing").field("required",true).field("path",ENTITYID_ID_FIELDNAME).endObject()
.startArray("dynamic_templates")
// we need most specific mappings first since it's a stop on match algorithm
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java
index 8aabbbf..a567594 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/StringUtils.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.index.utils;
import java.util.Arrays;
+import java.util.UUID;
import org.apache.commons.io.IOUtils;
@@ -32,37 +33,6 @@ public class StringUtils extends org.apache.commons.lang.StringUtils {
private static final Logger LOG = LoggerFactory.getLogger( StringUtils.class );
- public static Object lower( Object obj ) {
- if ( !( obj instanceof String ) ) {
- return obj;
- }
- return ( ( String ) obj ).toLowerCase();
- }
-
-
- public static String stringOrSubstringAfterLast( String str, char c ) {
- if ( str == null ) {
- return null;
- }
- int i = str.lastIndexOf( c );
- if ( i != -1 ) {
- return str.substring( i + 1 );
- }
- return str;
- }
-
-
- public static String stringOrSubstringBeforeLast( String str, char c ) {
- if ( str == null ) {
- return null;
- }
- int i = str.lastIndexOf( c );
- if ( i != -1 ) {
- return str.substring( 0, i );
- }
- return str;
- }
-
public static String stringOrSubstringBeforeFirst( String str, char c ) {
if ( str == null ) {
@@ -76,97 +46,17 @@ public class StringUtils extends org.apache.commons.lang.StringUtils {
}
- public static String stringOrSubstringAfterFirst( String str, char c ) {
- if ( str == null ) {
- return null;
- }
- int i = str.indexOf( c );
- if ( i != -1 ) {
- return str.substring( i + 1 );
- }
- return str;
- }
-
-
- public static String compactWhitespace( String str ) {
- if ( str == null ) {
- return null;
- }
- boolean prevWS = false;
- StringBuilder builder = new StringBuilder();
- for ( int i = 0; i < str.length(); i++ ) {
- char c = str.charAt( i );
- if ( Character.isWhitespace( c ) ) {
- if ( !prevWS ) {
- builder.append( ' ' );
- }
- prevWS = true;
- }
- else {
- prevWS = false;
- builder.append( c );
- }
- }
- return builder.toString().trim();
- }
-
-
- /** @return new string with replace applied */
- public static String replaceAll( String source, String find, String replace ) {
- if ( source == null ) {
- return null;
- }
- while ( true ) {
- String old = source;
- source = source.replaceAll( find, replace );
- if ( source.equals( old ) ) {
- return source;
- }
- }
- }
-
-
public static String toString( Object obj ) {
return string( obj );
}
- public static String toStringFormat( Object obj, String format ) {
- if ( obj != null ) {
- if ( format != null ) {
- if ( obj.getClass().isArray() ) {
- return String.format( format, Arrays.toString( ( Object[] ) obj ) );
- }
- return String.format( format, string( obj ) );
- }
- else {
- return string( obj );
- }
- }
- return "";
- }
-
-
- public static boolean isString( Object obj ) {
- return obj instanceof String;
- }
-
-
- public static boolean isStringOrNull( Object obj ) {
- if ( obj == null ) {
- return true;
- }
- return obj instanceof String;
- }
-
-
- public static String readClasspathFileAsString( String filePath ) {
- try {
- return IOUtils.toString( StringUtils.class.getResourceAsStream( filePath ) );
- }
- catch ( Exception e ) {
- LOG.error( "Error getting file from classpath: " + filePath, e );
- }
- return null;
+ /**
+ * Remove dashes from our uuid
+ * @param uuid
+ * @return
+ */
+ public static String sanitizeUUID(final UUID uuid){
+ return uuid.toString().replace( "-", "" );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
index fdffaef..b9b407b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
@@ -57,10 +57,6 @@ public class UUIDUtils {
private static ReentrantLock tsLock = new ReentrantLock( true );
- public static final UUID MIN_TIME_UUID = UUID.fromString( "00000000-0000-1000-8000-000000000000" );
-
- public static final UUID MAX_TIME_UUID = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
-
public static final UUID ZERO_UUID = new UUID( 0, 0 );
private static long timestampMillisNow = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d95ebba/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 70ae8c5..a3332a8 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,10 +24,7 @@ import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.*;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
import org.apache.usergrid.persistence.model.field.UUIDField;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -289,8 +286,8 @@ public class EntityIndexTest extends BaseIT {
entityIndex.createBatch().index(indexScope , entity ).execute().get();
entityIndex.refresh();
- CandidateResults candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes(entity.getId().getType()),
- Query.fromQL( "name contains 'Ferrari*'" ) );
+ CandidateResults candidateResults = entityIndex.search( indexScope,
+ SearchTypes.fromTypes( entity.getId().getType() ), Query.fromQL( "name contains 'Ferrari*'" ) );
assertEquals( 1, candidateResults.size() );
EntityIndexBatch batch = entityIndex.createBatch();
@@ -593,6 +590,98 @@ public class EntityIndexTest extends BaseIT {
assertNotEquals( "cluster should be fine", Health.RED, ei.getIndexHealth() );
assertNotEquals( "cluster should be ready now", Health.RED, ei.getClusterHealth() );
}
+
+
+ @Test
+ public void testCursorFormat() throws Exception {
+
+ Id appId = new SimpleId( "application" );
+ Id ownerId = new SimpleId( "owner" );
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
+ IndexScope indexScope = new IndexScopeImpl( ownerId, "users" );
+
+
+ EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
+ entityIndex.initializeIndex();
+
+ final EntityIndexBatch batch = entityIndex.createBatch();
+
+
+ final int size = 10;
+
+ final List<Id> entities = new ArrayList<>( size );
+
+
+ for ( int i = 0; i < size; i++ ) {
+ final String middleName = "middleName" + UUIDUtils.newTimeUUID();
+ Map<String, Object> properties = new LinkedHashMap<String, Object>();
+ properties.put( "username", "edanuff" );
+ properties.put( "email", "ed@anuff.com" );
+ properties.put( "middlename", middleName );
+
+ Map entityMap = new HashMap() {{
+ put( "username", "edanuff" );
+ put( "email", "ed@anuff.com" );
+ put( "middlename", middleName );
+ }};
+
+ final Id userId = new SimpleId( "user" );
+
+ Entity user = EntityIndexMapUtils.fromMap( entityMap );
+ EntityUtils.setId( user, userId );
+ EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() );
+
+ user.setField( new UUIDField( IndexingUtils.ENTITYID_ID_FIELDNAME, UUIDGenerator.newTimeUUID() ) );
+
+ entities.add( userId );
+
+
+ batch.index( indexScope, user );
+ }
+
+
+ batch.execute().get();
+ entityIndex.refresh();
+
+
+ final int limit = 1;
+
+
+ final int expectedPages = size / limit;
+
+
+ String cursor = null;
+
+ for ( int i = 0; i < expectedPages; i++ ) {
+ //**
+ final Query query = Query.fromQL( "select *" );
+ query.setLimit( limit );
+
+ if ( cursor != null ) {
+ query.setCursor( cursor );
+ }
+
+ final CandidateResults results = entityIndex.search( indexScope, SearchTypes.allTypes(), query );
+
+ assertTrue( results.hasCursor() );
+
+ cursor = results.getCursor();
+
+ assertEquals("Should be 16 bytes as hex", 32, cursor.length());
+
+
+
+
+ assertEquals( 1, results.size() );
+
+
+ assertEquals( results.get( 0 ).getId(), entities.get( i ) );
+ }
+ }
+
+
}
[2/5] incubator-usergrid git commit: change timer for buffer consumer
Posted by gr...@apache.org.
change timer for buffer consumer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0555622f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0555622f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0555622f
Branch: refs/heads/two-dot-o
Commit: 0555622f586b389fc0ee2aecf217f90a8ee16584
Parents: 5cc521f
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Mar 9 13:00:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Mar 9 13:00:35 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0555622f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index efde88f..94ea71e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -50,6 +50,7 @@ import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Consumer for IndexOperationMessages
@@ -78,6 +79,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
+
+ final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
@@ -99,8 +102,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
drainList.clear();
timer.stop();
}
+ countFail.set(0);
} catch (InterruptedException ie) {
+ int count = countFail.incrementAndGet();
log.error("failed to dequeue", ie);
+ if(count > 200){
+ log.error("Shutting down index drain due to repetitive failures");
+ //break;
+ }
+
}
} while (true);
}