You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/11 03:38:56 UTC
[1/5] incubator-usergrid git commit: Added buffer wiring to guice and
updated the tests.
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-466 31bee37a4 -> 8d8eb060e
Added buffer wiring to guice and updated the tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd0015d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd0015d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd0015d5
Branch: refs/heads/USERGRID-466
Commit: cd0015d5af1371528943e015022be21b2d8099f9
Parents: c5a4767
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 14:56:47 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 14:56:47 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexFig.java | 9 ++++++++
.../persistence/index/guice/IndexModule.java | 10 ++++++++-
.../persistence/index/impl/BufferQueue.java | 7 ++++++
.../index/impl/BufferQueueInMemory.java | 18 ++++++++-------
.../index/impl/EsIndexBufferConsumerImpl.java | 23 +++++++++++++++-----
.../index/impl/EsIndexBufferProducerImpl.java | 2 +-
.../index/guice/TestIndexModule.java | 9 +++++++-
7 files changed, 62 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index c6f08f6..befbaa9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -64,6 +64,11 @@ public interface IndexFig extends GuicyFig {
*/
public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
+ /**
+ * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple backpressure
+ */
+ public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@Default( "127.0.0.1" )
@@ -158,4 +163,8 @@ public interface IndexFig extends GuicyFig {
@Default("one")
@Key( INDEX_WRITE_CONSISTENCY_LEVEL )
String getWriteConsistencyLevel();
+
+ @Default("1000")
+ @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+ long getFailureRetryTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index ebd9098..d911dab 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.persistence.map.guice.MapModule;
import org.safehaus.guicyfig.GuicyFigModule;
-public class IndexModule extends AbstractModule {
+public abstract class IndexModule extends AbstractModule {
@Override
protected void configure() {
@@ -48,6 +48,14 @@ public class IndexModule extends AbstractModule {
bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
+ wireBufferQueue();
}
+
+ /**
+ * Write the <class>BufferQueue</class> for this implementation
+ */
+ public abstract void wireBufferQueue();
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
index dec6ac3..ffc3b90 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -46,4 +46,11 @@ public interface BufferQueue {
* @return
*/
public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+
+
+ /**
+ * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented
+ * @param messages
+ */
+ public void ack(List<IndexOperationMessage> messages);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
index 502f45d..403762f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
@@ -36,15 +36,11 @@ import com.google.inject.Singleton;
public class BufferQueueInMemory implements BufferQueue {
private final ArrayBlockingQueue<IndexOperationMessage> messages;
- private final IndexFig fig;
@Inject
- public BufferQueueInMemory( final ArrayBlockingQueue<IndexOperationMessage> messages, final IndexFig fig ) {
- this.messages = messages;
-
-
- this.fig = fig;
+ public BufferQueueInMemory(final IndexFig fig ) {
+ messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
}
@@ -78,8 +74,14 @@ public class BufferQueueInMemory implements BufferQueue {
}
}
- while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+ while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+ return response;
+ }
+
- return null;
+ @Override
+ public void ack( final List<IndexOperationMessage> messages ) {
+ //no op for this
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 19b8438..45c12a1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -39,6 +39,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -70,9 +71,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final BufferQueue bufferQueue;
@Inject
- public EsIndexBufferConsumerImpl( final IndexFig config, final IndexBufferProducer producer, final EsProvider
- provider, final MetricsFactory metricsFactory,
- final BufferQueue bufferQueue ){
+ public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider
+ provider, final MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
this.bufferQueue = bufferQueue;
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
@@ -101,17 +101,30 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
for ( IndexOperationMessage drained : drainList ) {
subscriber.onNext( drained );
}
- drainList.clear();
+
+ bufferQueue.ack( drainList );
+
timer.stop();
countFail.set( 0 );
}
+ catch( EsRejectedExecutionException err) {
+ countFail.incrementAndGet();
+ log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+
+ //es rejected the exception, sleep and retry in the queue
+ try {
+ Thread.sleep( config.getFailureRetryTime() );
+ }
+ catch ( InterruptedException e ) {
+ //swallow
+ }
+ }
catch ( Exception e ) {
int count = countFail.incrementAndGet();
log.error( "failed to dequeue", e );
if ( count > 200 ) {
log.error( "Shutting down index drain due to repetitive failures" );
- //break;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index f9999b2..db1f50e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -46,7 +46,7 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
private final BufferQueue bufferQueue;
@Inject
- public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, IndexFig fig, final BufferQueue bufferQueue ){
+ public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
this.bufferQueue = bufferQueue;
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 4d68dda..7e2312d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.guice;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.TestModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
public class TestIndexModule extends TestModule {
@@ -32,6 +34,11 @@ public class TestIndexModule extends TestModule {
// configure collections and our core astyanax framework
install( new CollectionModule() );
- install( new IndexModule() );
+ install( new IndexModule() {
+ @Override
+ public void wireBufferQueue() {
+ bind( BufferQueue.class).to( BufferQueueInMemory.class );
+ }
+ } );
}
}
[5/5] incubator-usergrid git commit: Refactored to write to cassandra
for data, and to SQS for the id of the data.
Posted by to...@apache.org.
Refactored to write to cassandra for data, and to SQS for the id of the data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d8eb060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d8eb060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d8eb060
Branch: refs/heads/USERGRID-466
Commit: 8d8eb060e20bd9942d591227ae2cdd5337214b81
Parents: 9111d94
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 20:38:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 20:38:52 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/map/MapManager.java | 35 ++--
.../persistence/map/impl/MapManagerImpl.java | 8 +
.../persistence/map/impl/MapSerialization.java | 9 +
.../map/impl/MapSerializationImpl.java | 93 +++++++++
.../persistence/map/MapManagerTest.java | 49 ++++-
stack/corepersistence/queryindex/pom.xml | 1 -
.../index/IndexOperationMessage.java | 36 +++-
.../index/impl/BufferQueueSQSImpl.java | 167 ++++++++++++++--
.../persistence/index/impl/DeIndexRequest.java | 10 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 196 +++++++++++--------
.../index/guice/TestIndexModule.java | 4 +-
.../index/impl/BufferQueueSQSImplTest.java | 145 ++++++++++++++
.../impl/EntityConnectionIndexImplTest.java | 4 +-
.../persistence/index/impl/EntityIndexTest.java | 21 +-
.../queue/impl/SQSQueueManagerImpl.java | 32 ++-
15 files changed, 656 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 62fe57d..69e0874 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -1,23 +1,26 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.usergrid.persistence.map;
+import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
@@ -33,6 +36,14 @@ public interface MapManager {
*/
public String getString( final String key );
+
+ /**
+ * Get the values for all the keys. If a value does not exist, it won't be present in the map
+ * @param keys
+ * @return
+ */
+ public Map<String, String> getStrings(final Collection<String> keys);
+
/**
* Return the string, null if not found
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index c077c7d..fb2e7ff 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.map.impl;
+import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import org.apache.usergrid.persistence.map.MapManager;
@@ -51,6 +53,12 @@ public class MapManagerImpl implements MapManager {
@Override
+ public Map<String, String> getStrings( final Collection<String> keys ) {
+ return mapSerialization.getStrings( scope, keys );
+ }
+
+
+ @Override
public void putString( final String key, final String value ) {
mapSerialization.putString( scope, key, value );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 6e7e328..2e958c2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.map.impl;
+import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
@@ -33,6 +35,13 @@ public interface MapSerialization extends Migration {
public String getString( final MapScope scope, final String key );
/**
+ * Get strings from the map
+ * @param keys
+ * @return
+ */
+ public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+
+ /**
* Return the string, null if not found
*/
public void putString( final MapScope scope, final String key, final String value );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 715c202..825d636 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,9 +18,12 @@
*/
package org.apache.usergrid.persistence.map.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import com.google.common.base.Preconditions;
@@ -50,6 +53,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.serializers.BooleanSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
@@ -73,6 +79,9 @@ public class MapSerializationImpl implements MapSerialization {
private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
+ private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
+
+
/**
* CFs where the row key contains the source node id
*/
@@ -126,6 +135,12 @@ public class MapSerializationImpl implements MapSerialization {
@Override
+ public Map<String, String> getStrings(final MapScope scope, final Collection<String> keys ) {
+ return getValues( scope, keys, STRING_RESULTS_BUILDER );
+ }
+
+
+ @Override
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
@@ -371,6 +386,49 @@ public class MapSerializationImpl implements MapSerialization {
}
+ /**
+ * Get multiple values, using the string builder
+ * @param scope
+ * @param keys
+ * @param builder
+ * @param <T>
+ * @return
+ */
+ private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+
+
+ final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
+
+ for(final String key: keys){
+ //add it to the entry
+ final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+
+ rowKeys.add( entryRowKey );
+
+ }
+
+
+
+ //now get all columns, including the "old row key value"
+ try {
+ final Rows<ScopedRowKey<MapEntryKey>, Boolean>
+ rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
+ .execute().getResult();
+
+
+ return builder.buildResults( rows );
+ }
+ catch ( NotFoundException nfe ) {
+ //nothing to return
+ return null;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+ }
+
+
+
private void executeBatch(MutationBatch batch) {
try {
batch.execute();
@@ -449,4 +507,39 @@ public class MapSerializationImpl implements MapSerialization {
return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
}
}
+
+
+ /**
+ * Build the results from the row keys
+ * @param <T>
+ */
+ private static interface ResultsBuilder<T> {
+
+ public T buildResults(final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+ }
+
+ public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+ @Override
+ public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
+ final int size = rows.size();
+
+ final Map<String, String> results = new HashMap<>(size);
+
+ for(int i = 0; i < size; i ++){
+
+ final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
+
+ final String value = row.getColumns().getStringValue( true, null );
+
+ if(value == null){
+ continue;
+ }
+
+ results.put( row.getKey().getKey().key, value );
+ }
+
+ return results;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index df4394e..41286ab 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -20,6 +20,9 @@
package org.apache.usergrid.persistence.map;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -34,9 +37,11 @@ import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.map.guice.TestMapModule;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Inject;
+import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -79,6 +84,47 @@ public class MapManagerTest {
@Test
+ public void multiReadNoKey() {
+ MapManager mm = mmf.createMapManager( this.scope );
+
+ final String key = UUIDGenerator.newTimeUUID().toString();
+
+ final Map<String, String> results = mm.getStrings( Collections.singleton( key ) );
+
+ assertNotNull( results );
+
+ final String shouldBeMissing = results.get( key );
+
+ assertNull( shouldBeMissing );
+ }
+
+
+ @Test
+ public void writeReadStringBatch() {
+ MapManager mm = mmf.createMapManager( this.scope );
+
+ final String key1 = "key1";
+ final String value1 = "value1";
+
+ mm.putString( key1, value1 );
+
+
+ final String key2 = "key2";
+ final String value2 = "value2";
+
+ mm.putString( key2, value2 );
+
+
+ final Map<String, String> returned = mm.getStrings( Arrays.asList( key1, key2 ) );
+
+ assertNotNull( returned );
+
+ assertEquals( value1, returned.get( key1 ) );
+ assertEquals( value2, returned.get( key2 ) );
+ }
+
+
+ @Test
public void writeReadStringTTL() throws InterruptedException {
MapManager mm = mmf.createMapManager( this.scope );
@@ -106,8 +152,7 @@ public class MapManagerTest {
//now read it should be gone
final String timedOut = mm.getString( key );
- assertNull("Value was not returned", timedOut);
-
+ assertNull( "Value was not returned", timedOut );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index a5fbf6a..af843ad 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -145,7 +145,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
- <scope>test</scope>
</dependency>
<!-- common stuff, logging, etc.-->
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 7d8a859..a7388d6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -60,17 +60,17 @@ public class IndexOperationMessage implements Serializable {
public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
- indexRequests.addAll( indexRequests );
+ this.indexRequests.addAll( indexRequests );
}
public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
- deIndexRequests.add( deIndexRequest );
+ this.deIndexRequests.add( deIndexRequest );
}
public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
- deIndexRequests.addAll( deIndexRequests );
+ this.deIndexRequests.addAll( deIndexRequests );
}
@@ -96,4 +96,34 @@ public class IndexOperationMessage implements Serializable {
public BetterFuture<IndexOperationMessage> getFuture() {
return containerFuture;
}
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final IndexOperationMessage that = ( IndexOperationMessage ) o;
+
+ if ( !deIndexRequests.equals( that.deIndexRequests ) ) {
+ return false;
+ }
+ if ( !indexRequests.equals( that.indexRequests ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = indexRequests.hashCode();
+ result = 31 * result + deIndexRequests.hashCode();
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 833e045..c6acb36 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -23,56 +23,132 @@ package org.apache.usergrid.persistence.index.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+/**
+ * This is experimental at best. Our SQS size limit is a problem. We shouldn't use this for index operation. Only for
+ * performing
+ */
@Singleton
public class BufferQueueSQSImpl implements BufferQueue {
+ private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
+
/** Hacky, copied from CPEntityManager b/c we can't access it here */
public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+ /**
+ * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed
+ */
+ public static final int TTL = 60 * 60 * 24 * 30;
+
+ /**
+ * The name to put in the map
+ */
+ public static final String MAP_NAME = "esqueuedata";
+
+
private static final String QUEUE_NAME = "es_queue";
+ private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+
+ static {
+ SMILE_FACTORY.delegateToTextual( true );
+ }
+
+
private final QueueManager queue;
+ private final MapManager mapManager;
private final IndexFig indexFig;
+ private final ObjectMapper mapper;
+ private final Meter readMeter;
+ private final Timer readTimer;
+ private final Meter writeMeter;
+ private final Timer writeTimer;
@Inject
- public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
- final QueueScope scope =
+ public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
+ final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+ final QueueScope queueScope =
new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
- this.queue = queueManagerFactory.getQueueManager( scope );
+ this.queue = queueManagerFactory.getQueueManager( queueScope );
this.indexFig = indexFig;
+
+ final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
+
+ this.mapManager = mapManagerFactory.createMapManager( scope );
+
+
+ this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
+ this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
+
+ this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
+ this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
+
+ this.mapper = new ObjectMapper( SMILE_FACTORY );
+ //pretty print, disabling for speed
+ // mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
}
@Override
public void offer( final IndexOperationMessage operation ) {
+ final Timer.Context timer = this.writeTimer.time();
+ this.writeMeter.mark();
+
+ final UUID identifier = UUIDGenerator.newTimeUUID();
try {
- this.queue.sendMessage( operation );
+
+ final String payLoad = toString( operation );
+
+ //write to cassandra
+ this.mapManager.putString( identifier.toString(), payLoad, TTL );
+
+ //signal to SQS
+ this.queue.sendMessage( identifier );
operation.getFuture().run();
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to queue message", e );
}
+ finally {
+ timer.stop();
+ }
}
@@ -83,23 +159,71 @@ public class BufferQueueSQSImpl implements BufferQueue {
final int actualTake = Math.min( 10, takeSize );
- List<QueueMessage> messages = queue
- .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
- IndexOperationMessage.class );
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ List<QueueMessage> messages = queue
+ .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+ String.class );
- final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
- for ( final QueueMessage message : messages ) {
- final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
+ final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
- SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message, messageBody );
+ final List<String> mapEntries = new ArrayList<>( messages.size() );
- response.add( operation );
- }
- return response;
+ if(messages.size() == 0){
+ return response;
+ }
+
+ //add all our keys for a single round trip
+ for ( final QueueMessage message : messages ) {
+ mapEntries.add( message.getBody().toString() );
+ }
+
+ //look up the values
+ final Map<String, String> values = mapManager.getStrings( mapEntries );
+
+
+ //load them into our response
+ for ( final QueueMessage message : messages ) {
+
+ final String key = message.getBody().toString();
+
+ //now see if the key was there
+ final String payload = values.get( key );
+
+ //the entry was not present in cassandra, ignore this message. Failure should eventually kick it to
+ // a DLQ
+
+ if ( payload == null ) {
+ continue;
+ }
+
+ final IndexOperationMessage messageBody;
+
+ try {
+ messageBody = fromString( payload );
+ }
+ catch ( IOException e ) {
+ logger.error( "Unable to deserialize message from string. This is a bug", e );
+ throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e );
+ }
+
+ SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
+
+ response.add( operation );
+ }
+
+ readMeter.mark( response.size() );
+ return response;
+ }
+ //stop our timer
+ finally {
+ timer.stop();
+ }
}
@@ -107,7 +231,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
public void ack( final List<IndexOperationMessage> messages ) {
//nothing to do
- if(messages.size() == 0){
+ if ( messages.size() == 0 ) {
return;
}
@@ -121,6 +245,19 @@ public class BufferQueueSQSImpl implements BufferQueue {
}
+ /** Read the object from Base64 string. */
+ private IndexOperationMessage fromString( String s ) throws IOException {
+ IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
+ return o;
+ }
+
+
+ /** Write the object to a Base64 string. */
+ private String toString( IndexOperationMessage o ) throws IOException {
+ return mapper.writeValueAsString( o );
+ }
+
+
/**
* The message that subclasses our IndexOperationMessage. holds a pointer to the original message
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index c63c4df..9f3ce66 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -37,9 +37,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
public class DeIndexRequest implements BatchRequest {
- public final String[] indexes;
- public final String entityType;
- public final String documentId;
+ public String[] indexes;
+ public String entityType;
+ public String documentId;
public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
@@ -49,6 +49,10 @@ public class DeIndexRequest implements BatchRequest {
}
+ public DeIndexRequest() {
+ }
+
+
@Override
public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 8547889..3fc3e77 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -42,7 +42,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
+import rx.Subscription;
import rx.functions.Action1;
+import rx.functions.Action2;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -60,17 +62,23 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final IndexFig config;
private final FailureMonitorImpl failureMonitor;
private final Client client;
- private final Observable<List<IndexOperationMessage>> consumer;
+
private final Timer flushTimer;
private final Counter indexSizeCounter;
private final Meter flushMeter;
private final Timer produceTimer;
private final BufferQueue bufferQueue;
+ //the actively running subscription
+ private Subscription subscription;
+
+ private Observable<List<IndexOperationMessage>> consumer;
+
+ private Object mutex = new Object();
+
@Inject
public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider
provider, final MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
- this.bufferQueue = bufferQueue;
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -78,81 +86,101 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient();
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
+ this.bufferQueue = bufferQueue;
+
- final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
- this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
- @Override
- public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+ start();
+ }
+
+
+ public void start() {
+ synchronized ( mutex) {
- //name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+ final AtomicInteger countFail = new AtomicInteger();
- List<IndexOperationMessage> drainList;
- do {
- try {
+ Observable<List<IndexOperationMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
+ @Override
+ public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+ //name our thread so it's easy to see
+ Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
- Timer.Context timer = produceTimer.time();
+ List<IndexOperationMessage> drainList;
+ do {
+ try {
- drainList = bufferQueue
- .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
+ Timer.Context timer = produceTimer.time();
- subscriber.onNext( drainList );
+ drainList = bufferQueue.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
+ TimeUnit.MILLISECONDS );
- timer.stop();
+ subscriber.onNext( drainList );
- countFail.set( 0 );
- }
- catch ( EsRejectedExecutionException err ) {
- countFail.incrementAndGet();
- log.error(
- "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. "
- + "Failed {} consecutive times",
- config.getFailRefreshCount(), countFail.get() );
-
- //es rejected the exception, sleep and retry in the queue
- try {
- Thread.sleep( config.getFailureRetryTime() );
+
+ timer.stop();
+
+ countFail.set( 0 );
}
- catch ( InterruptedException e ) {
- //swallow
+ catch ( EsRejectedExecutionException err ) {
+ countFail.incrementAndGet();
+ log.error(
+ "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. " + "Failed {} consecutive times", config.getFailRefreshCount(),
+ countFail.get() );
+
+ //es rejected the exception, sleep and retry in the queue
+ try {
+ Thread.sleep( config.getFailureRetryTime() );
+ }
+ catch ( InterruptedException e ) {
+ //swallow
+ }
}
- }
- catch ( Exception e ) {
- int count = countFail.incrementAndGet();
- log.error( "failed to dequeue", e );
- if ( count > 200 ) {
- log.error( "Shutting down index drain due to repetitive failures" );
+ catch ( Exception e ) {
+ int count = countFail.incrementAndGet();
+ log.error( "failed to dequeue", e );
+ if ( count > 200 ) {
+ log.error( "Shutting down index drain due to repetitive failures" );
+ }
}
}
+ while ( true );
}
- while ( true );
- }
- } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
- @Override
- public void call( List<IndexOperationMessage> containerList ) {
- if ( containerList.size() > 0 ) {
- flushMeter.mark( containerList.size() );
- Timer.Context time = flushTimer.time();
- execute( containerList );
- time.stop();
+ } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call( List<IndexOperationMessage> containerList ) {
+ if ( containerList.size() > 0 ) {
+ flushMeter.mark( containerList.size() );
+ Timer.Context time = flushTimer.time();
+ execute( containerList );
+ time.stop();
+ }
}
+ } )
+ //ack after we process
+ .doOnNext( new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+ bufferQueue.ack( indexOperationMessages );
+ }
+ } );
+
+ //start in the background
+ subscription = consumer.subscribe();
+ }
+ }
+
+
+ public void stop() {
+ synchronized ( mutex ) {
+ //stop consuming
+ if(subscription != null) {
+ subscription.unsubscribe();
}
-} )
- //ack after we process
- .doOnNext( new Action1<List<IndexOperationMessage>>() {
- @Override
- public void call( final List<IndexOperationMessage> indexOperationMessages ) {
- bufferQueue.ack( indexOperationMessages );
- }
- } );
-
- //start in the background
- consumer.subscribe();
+ }
}
/**
@@ -165,43 +193,36 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
//process and flatten all the messages to builder requests
- Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
- .subscribeOn(Schedulers.io())
- .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
- @Override
- public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
- final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
- final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
-
- return Observable.merge( deIndex, index );
- }
- } );
-
+ Observable<IndexOperationMessage> flattenMessages = Observable.from( operationMessages );
//batch shard operations into a bulk request
- flattenMessages.toList()
- .doOnNext(new Action1<List<BatchRequest>>() {
- @Override
- public void call(List<BatchRequest> builders) {
- try {
- final BulkRequestBuilder bulkRequest = initRequest();
- for (BatchRequest builder : builders) {
- indexSizeCounter.dec();
+ flattenMessages.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
+ @Override
+ public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
+ final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
+ final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() );
- builder.doOperation( client, bulkRequest );
- }
- sendRequest(bulkRequest);
- }catch (Exception e){
- log.error("Failed while sending bulk",e);
- }
- }
- })
- .toBlocking().lastOrDefault(null);
+ return Observable.merge( index, deIndex );
+ }
+ } )
+ //collection all the operations into a single stream
+ .collect( initRequest(), new Action2<BulkRequestBuilder, BatchRequest>() {
+ @Override
+ public void call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+ batchRequest.doOperation( client, bulkRequestBuilder );
+ }
+ } )
+ //send the request off to ES
+ .doOnNext( new Action1<BulkRequestBuilder>() {
+ @Override
+ public void call( final BulkRequestBuilder bulkRequestBuilder ) {
+ sendRequest( bulkRequestBuilder );
+ }
+ } ).toBlocking().last();
//call back all futures
Observable.from(operationMessages)
- .subscribeOn(Schedulers.io())
.doOnNext(new Action1<IndexOperationMessage>() {
@Override
public void call(IndexOperationMessage operationMessage) {
@@ -211,6 +232,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
.toBlocking().lastOrDefault(null);
}
+
/**
* initialize request
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 57c2fab..7d7a18d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
install( new IndexModule() {
@Override
public void wireBufferQueue() {
- bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-// bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+// bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+ bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
new file mode 100644
index 0000000..4a4672e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.junit.Assert.*;
+
+
+
+
+@RunWith(EsRunner.class)
+@UseModules({ TestIndexModule.class })
+@NotThreadSafe
+public class BufferQueueSQSImplTest {
+
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+ @Inject
+ private BufferQueueSQSImpl bufferQueueSQS;
+
+ @Inject
+ private EsIndexBufferConsumerImpl esIndexBufferConsumer;
+
+
+ @Before
+ public void stop() {
+ esIndexBufferConsumer.stop();
+ }
+
+
+ @After
+ public void after() {
+ esIndexBufferConsumer.start();
+ }
+
+
+
+
+ @Test
+ public void testMessageIndexing(){
+
+ final Map<String, Object> request1Data = new HashMap<String, Object>() {{put("test", "testval1");}};
+ final IndexRequest indexRequest1 = new IndexRequest( "testAlias1", "testType1", "testDoc1",request1Data );
+
+
+ final Map<String, Object> request2Data = new HashMap<String, Object>() {{put("test", "testval2");}};
+ final IndexRequest indexRequest2 = new IndexRequest( "testAlias2", "testType2", "testDoc2",request2Data );
+
+
+ //de-index request
+ final DeIndexRequest deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, "testType3", "testId3" );
+
+ final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, "testType4", "testId4" );
+
+
+
+
+ IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+ indexOperationMessage.addIndexRequest( indexRequest1);
+ indexOperationMessage.addIndexRequest( indexRequest2);
+
+ indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
+ indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
+
+ bufferQueueSQS.offer( indexOperationMessage );
+
+ //wait for it to send to SQS
+ indexOperationMessage.getFuture().get();
+
+ //now get it back
+
+ final List<IndexOperationMessage> ops = bufferQueueSQS.take( 10, 20, TimeUnit.SECONDS );
+
+ assertTrue(ops.size() > 1);
+
+ final IndexOperationMessage returnedOperation = ops.get( 0 );
+
+ //get the operations out
+
+ final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests();
+
+ assertTrue(indexRequestSet.contains(indexRequest1));
+ assertTrue(indexRequestSet.contains(indexRequest2));
+
+
+ final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests();
+
+ assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
+ assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
+
+
+
+ //now ack the message
+
+ bufferQueueSQS.ack( ops );
+
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 215ff57..c5f3488 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
EsTestUtils.waitForTasks(personLikesIndex);
- Thread.sleep( 30000 );
+ Thread.sleep( 2000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
personLikesIndex.refresh();
EsTestUtils.waitForTasks( personLikesIndex );
- Thread.sleep( 30000 );
+ Thread.sleep( 2000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex.search( searchScope,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a15053c..a2135a3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,12 +24,14 @@ import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.model.field.ArrayField;
import org.apache.usergrid.persistence.model.field.EntityObjectField;
import org.apache.usergrid.persistence.model.field.UUIDField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@@ -69,8 +71,14 @@ public class EntityIndexTest extends BaseIT {
@Inject
public EntityIndexFactory eif;
+ //TODO T.N. Remove this when we move the cursor mapping back to core
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
+
+
@Test
- public void testIndex() throws IOException {
+ public void testIndex() throws IOException, InterruptedException {
Id appId = new SimpleId( "application" );
ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
@@ -86,7 +94,9 @@ public class EntityIndexTest extends BaseIT {
entityIndex.refresh();
- testQueries( indexScope, searchTypes, entityIndex );
+ Thread.sleep(100000000);
+
+ testQueries( indexScope, searchTypes, entityIndex );
}
@Test
@@ -660,15 +670,12 @@ public class EntityIndexTest extends BaseIT {
for ( int i = 0; i < size; i++ ) {
final String middleName = "middleName" + UUIDUtils.newTimeUUID();
- Map<String, Object> properties = new LinkedHashMap<String, Object>();
- properties.put( "username", "edanuff" );
- properties.put( "email", "ed@anuff.com" );
- properties.put( "middlename", middleName );
Map entityMap = new HashMap() {{
put( "username", "edanuff" );
put( "email", "ed@anuff.com" );
put( "middlename", middleName );
+ put( "created", System.nanoTime() );
}};
final Id userId = new SimpleId( "user" );
@@ -700,7 +707,7 @@ public class EntityIndexTest extends BaseIT {
for ( int i = 0; i < expectedPages; i++ ) {
//**
- final Query query = Query.fromQL( "select *" );
+ final Query query = Query.fromQL( "select * order by created" );
query.setLimit( limit );
if ( cursor != null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 10aa621..e2c5c1e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -48,20 +48,19 @@ import java.util.concurrent.TimeUnit;
public class SQSQueueManagerImpl implements QueueManager {
private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
- private AmazonSQSClient sqs;
- private QueueScope scope;
- private QueueFig fig;
+ private final AmazonSQSClient sqs;
+ private final QueueScope scope;
private ObjectMapper mapper;
private static SmileFactory smileFactory = new SmileFactory();
- private static LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
+ private LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
.maximumSize(1000)
.build(new CacheLoader<SqsLoader, Queue>() {
@Override
public Queue load(SqsLoader queueLoader) throws Exception {
Queue queue = null;
try {
- GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
+ GetQueueUrlResult result = sqs.getQueueUrl(queueLoader.getKey());
queue = new Queue(result.getQueueUrl());
} catch (QueueDoesNotExistException queueDoesNotExistException) {
queue = null;
@@ -73,7 +72,7 @@ public class SQSQueueManagerImpl implements QueueManager {
String name = queueLoader.getKey();
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(name);
- CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
+ CreateQueueResult result = sqs.createQueue(createQueueRequest);
String url = result.getQueueUrl();
queue = new Queue(url);
LOG.info("Created queue with url {}", url);
@@ -85,7 +84,6 @@ public class SQSQueueManagerImpl implements QueueManager {
@Inject
public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
- this.fig = fig;
this.scope = scope;
try {
UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
@@ -99,8 +97,7 @@ public class SQSQueueManagerImpl implements QueueManager {
// mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
} catch ( Exception e ) {
- LOG.warn("failed to setup SQS",e);
-// throw new RuntimeException("Error setting up mapper", e);
+ throw new RuntimeException("Error setting up mapper", e);
}
}
@@ -127,14 +124,14 @@ public class SQSQueueManagerImpl implements QueueManager {
}
waitTime = waitTime/1000;
String url = getQueue().getUrl();
- LOG.info("Getting {} messages from {}", limit, url);
+ LOG.debug( "Getting {} messages from {}", limit, url);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
receiveMessageRequest.setMaxNumberOfMessages(limit);
receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
receiveMessageRequest.setWaitTimeSeconds(waitTime);
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();
- LOG.info("Received {} messages from {}",messages.size(),url);
+ LOG.debug( "Received {} messages from {}", messages.size(), url);
List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
for (Message message : messages) {
Object body ;
@@ -157,7 +154,7 @@ public class SQSQueueManagerImpl implements QueueManager {
return;
}
String url = getQueue().getUrl();
- LOG.info("Sending Messages...{} to {}", bodies.size(), url);
+ LOG.debug( "Sending Messages...{} to {}", bodies.size(), url);
SendMessageBatchRequest request = new SendMessageBatchRequest(url);
List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
@@ -180,7 +177,7 @@ public class SQSQueueManagerImpl implements QueueManager {
return;
}
String url = getQueue().getUrl();
- LOG.info("Sending Message...{} to {}",body.toString(),url);
+ LOG.debug( "Sending Message...{} to {}", body.toString(), url);
final String stringBody = toString(body);
@@ -192,7 +189,7 @@ public class SQSQueueManagerImpl implements QueueManager {
@Override
public void commitMessage(QueueMessage queueMessage) {
String url = getQueue().getUrl();
- LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
+ LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url);
sqs.deleteMessage(new DeleteMessageRequest()
.withQueueUrl(url)
@@ -203,7 +200,7 @@ public class SQSQueueManagerImpl implements QueueManager {
@Override
public void commitMessages(List<QueueMessage> queueMessages) {
String url = getQueue().getUrl();
- LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
+ LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url);
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
for(QueueMessage message : queueMessages){
entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
@@ -233,16 +230,11 @@ public class SQSQueueManagerImpl implements QueueManager {
public class SqsLoader {
private final String key;
- private final AmazonSQSClient client;
public SqsLoader(String key, AmazonSQSClient client) {
this.key = key;
- this.client = client;
}
- public AmazonSQSClient getClient() {
- return client;
- }
public String getKey() {
return key;
[3/5] incubator-usergrid git commit: Merge branch 'USERGRID-466' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466
Posted by to...@apache.org.
Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c47f32a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c47f32a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c47f32a7
Branch: refs/heads/USERGRID-466
Commit: c47f32a72d3ab322985e66e893d1622af3be16fc
Parents: 2391974 31bee37
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 16:12:48 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 16:12:48 2015 -0600
----------------------------------------------------------------------
.../apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c47f32a7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
[2/5] incubator-usergrid git commit: Refactored index messages to be
serializable.
Posted by to...@apache.org.
Refactored index messages to be serializable.
Consumers now rolls up index requests and flushes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23919745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23919745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23919745
Branch: refs/heads/USERGRID-466
Commit: 23919745883ff9d60c18ce75fc1a056a017bce37
Parents: cd0015d
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 16:12:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 16:12:34 2015 -0600
----------------------------------------------------------------------
stack/corepersistence/queryindex/pom.xml | 8 +
.../usergrid/persistence/index/IndexFig.java | 7 +
.../index/IndexOperationMessage.java | 29 +++-
.../persistence/index/guice/IndexModule.java | 2 +
.../persistence/index/impl/BatchRequest.java | 41 +++++
.../index/impl/BufferQueueInMemory.java | 87 ----------
.../index/impl/BufferQueueInMemoryImpl.java | 87 ++++++++++
.../index/impl/BufferQueueSQSImpl.java | 158 +++++++++++++++++++
.../persistence/index/impl/DeIndexRequest.java | 106 +++++++++++++
.../index/impl/EsEntityIndexBatchImpl.java | 39 ++---
.../index/impl/EsEntityIndexImpl.java | 2 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 37 ++---
.../persistence/index/impl/IndexRequest.java | 117 ++++++++++++++
.../index/guice/TestIndexModule.java | 6 +-
.../persistence/queue/QueueManager.java | 4 +-
15 files changed, 581 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index f6ae718..a5fbf6a 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -108,6 +108,14 @@
</dependency>
<dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>queue</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ </dependency>
+
+
+ <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index befbaa9..ce14449 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -55,6 +55,8 @@ public interface IndexFig extends GuicyFig {
public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
+ public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -167,4 +169,9 @@ public interface IndexFig extends GuicyFig {
@Default("1000")
@Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
long getFailureRetryTime();
+
+ //give us 60 seconds to process the message
+ @Default("60")
+ @Key(INDEX_QUEUE_READ_TIMEOUT)
+ int getIndexQueueTimeout();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 944a71f..43eaa01 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -17,23 +17,28 @@
package org.apache.usergrid.persistence.index;
import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.BatchRequest;
+
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+import java.io.Serializable;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Container for index operations.
*/
-public class IndexOperationMessage {
- private final ConcurrentLinkedQueue<ActionRequestBuilder> builders;
+public class IndexOperationMessage implements Serializable {
+ private final Set<BatchRequest> builders;
private final BetterFuture<IndexOperationMessage> containerFuture;
public IndexOperationMessage(){
final IndexOperationMessage parent = this;
- builders = new ConcurrentLinkedQueue<>();
+ this.builders = new HashSet<>();
this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
@Override
public IndexOperationMessage call() throws Exception {
@@ -42,7 +47,21 @@ public class IndexOperationMessage {
});
}
- public void addOperation(ActionRequestBuilder builder){
+
+ /**
+ * Add all our operations in the set
+ * @param requests
+ */
+ public void setOperations(final Set<BatchRequest> requests){
+ this.builders.addAll( requests);
+ }
+
+
+ /**
+ * Add the operation to the set
+ * @param builder
+ */
+ public void addOperation(BatchRequest builder){
builders.add(builder);
}
@@ -50,7 +69,7 @@ public class IndexOperationMessage {
* return operations for the message
* @return
*/
- public ConcurrentLinkedQueue<ActionRequestBuilder> getOperations(){
+ public Set<BatchRequest> getOperations(){
return builders;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index d911dab..b03e1c0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -41,6 +42,7 @@ public abstract class IndexModule extends AbstractModule {
install(new GuicyFigModule(IndexFig.class));
install(new MapModule());
+ install(new QueueModule());
bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
new file mode 100644
index 0000000..df6c5b0
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.Serializable;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * A batch request we can serialize and construct on receive
+ */
+public interface BatchRequest extends Serializable {
+
+
+ /**
+ * Passing the client and the bulk request, add ourselves to the bulk request
+ * @param client
+ * @param bulkRequest
+ */
+ public void doOperation(final Client client, final BulkRequestBuilder bulkRequest );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
deleted file mode 100644
index 403762f..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class BufferQueueInMemory implements BufferQueue {
-
- private final ArrayBlockingQueue<IndexOperationMessage> messages;
-
-
- @Inject
- public BufferQueueInMemory(final IndexFig fig ) {
- messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
- }
-
-
- @Override
- public void offer( final IndexOperationMessage operation ) {
- messages.offer( operation );
- }
-
-
- @Override
- public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
-
- final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
-
- final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
-
- //loop until we're we're full or we time out
- do {
- try {
- //we received 1, try to drain
- IndexOperationMessage polled = messages.poll( timeout, timeUnit );
-
- //drain
- if ( polled != null ) {
- response.add( polled );
- messages.drainTo( response, takeSize - response.size() );
- }
- }
- catch ( InterruptedException ie ) {
- //swallow
-
- }
- }
- while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
-
- return response;
- }
-
-
- @Override
- public void ack( final List<IndexOperationMessage> messages ) {
- //no op for this
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
new file mode 100644
index 0000000..ef0ef5f
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemoryImpl implements BufferQueue {
+
+ private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
+
+ @Inject
+ public BufferQueueInMemoryImpl( final IndexFig fig ) {
+ messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
+ }
+
+
+ @Override
+ public void offer( final IndexOperationMessage operation ) {
+ messages.offer( operation );
+ }
+
+
+ @Override
+ public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+ final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+
+ final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+ //loop until we're we're full or we time out
+ do {
+ try {
+ //we received 1, try to drain
+ IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+ //drain
+ if ( polled != null ) {
+ response.add( polled );
+ messages.drainTo( response, takeSize - response.size() );
+ }
+ }
+ catch ( InterruptedException ie ) {
+ //swallow
+
+ }
+ }
+ while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+ return response;
+ }
+
+
+ @Override
+ public void ack( final List<IndexOperationMessage> messages ) {
+ //no op for this
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
new file mode 100644
index 0000000..b814603
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.action.ActionRequestBuilder;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueSQSImpl implements BufferQueue {
+
+ /** Hacky, copied from CPEntityManager b/c we can't access it here */
+ public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+ private static final String QUEUE_NAME = "es_queue";
+
+ private final QueueManager queue;
+ private final IndexFig indexFig;
+
+
+ @Inject
+ public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
+ final QueueScope scope =
+ new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
+
+ this.queue = queueManagerFactory.getQueueManager( scope );
+ this.indexFig = indexFig;
+ }
+
+
+ @Override
+ public void offer( final IndexOperationMessage operation ) {
+ final Message toQueue = new Message( operation.getOperations() );
+
+
+
+
+ try {
+ this.queue.sendMessage( toQueue );
+ operation.getFuture().run();
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to queue message", e );
+ }
+ }
+
+
+ @Override
+ public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+ //loop until we're we're full or we time out
+ List<QueueMessage> messages = queue
+ .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+ Message.class );
+
+
+ final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
+
+ for ( final QueueMessage message : messages ) {
+
+ SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+
+ operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+
+ response.add( operation );
+ }
+
+ return response;
+ }
+
+
+ @Override
+ public void ack( final List<IndexOperationMessage> messages ) {
+
+ List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+ for(IndexOperationMessage ioe: messages){
+ toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+ }
+
+ queue.commitMessages( toAck );
+ }
+
+
+ /**
+ * The message to queue to SQS
+ */
+ public static final class Message implements Serializable {
+ private final Set<BatchRequest> data;
+
+
+ private Message( final Set<BatchRequest> data ) {this.data = data;}
+
+
+ public Set<BatchRequest> getData() {
+ return data;
+ }
+ }
+
+
+ /**
+ * The message that subclasses our IndexOperationMessage. holds a pointer to the original message
+ */
+ public class SqsIndexOperationMessage extends IndexOperationMessage {
+
+ private final QueueMessage message;
+
+
+ public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+
+
+ /**
+ * Get the message from our queue
+ */
+ public QueueMessage getMessage() {
+ return message;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
new file mode 100644
index 0000000..a279f16
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Arrays;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class DeIndexRequest implements BatchRequest {
+
+ public final String[] indexes;
+ public final String entityType;
+ public final String documentId;
+
+
+ public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
+ this.indexes = indexes;
+ this.entityType = entityType;
+ this.documentId = documentId;
+ }
+
+
+ @Override
+ public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+
+
+ for(final String index: indexes) {
+ final DeleteRequestBuilder builder = client.prepareDelete( index, entityType, documentId);
+
+ bulkRequest.add( builder );
+ }
+ }
+
+
+ public String[] getIndexes() {
+ return indexes;
+ }
+
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final DeIndexRequest that = ( DeIndexRequest ) o;
+
+ if ( !documentId.equals( that.documentId ) ) {
+ return false;
+ }
+ if ( !entityType.equals( that.entityType ) ) {
+ return false;
+ }
+ if ( !Arrays.equals( indexes, that.indexes ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = Arrays.hashCode( indexes );
+ result = 31 * result + entityType.hashCode();
+ result = 31 * result + documentId.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d987b29..b0c731e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -73,10 +73,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final ApplicationScope applicationScope;
- private final Client client;
-
- private final boolean refresh;
-
private final IndexIdentifier.IndexAlias alias;
private final IndexIdentifier indexIdentifier;
@@ -85,21 +81,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final AliasedEntityIndex entityIndex;
private IndexOperationMessage container;
- private final Timer batchTimer;
- public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,
+ public EsEntityIndexBatchImpl(final ApplicationScope applicationScope,
final IndexBufferProducer indexBatchBufferProducer,final IndexFig config,
- final AliasedEntityIndex entityIndex,final MetricsFactory metricsFactory ) {
+ final AliasedEntityIndex entityIndex ) {
this.applicationScope = applicationScope;
- this.client = client;
this.indexBatchBufferProducer = indexBatchBufferProducer;
this.entityIndex = entityIndex;
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
- this.refresh = config.isForcedRefresh();
- this.batchTimer = metricsFactory.getTimer( EsEntityIndexBatchImpl.class, "entity.index.batch.timer" );
//constrained
this.container = new IndexOperationMessage();
}
@@ -133,9 +125,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
final String entityType = entity.getId().getType();
- IndexRequestBuilder builder =
- client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
- container.addOperation(builder);
+
+
+ container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+
return this;
}
@@ -174,23 +167,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
if(indexes == null ||indexes.length == 0){
indexes = new String[]{indexIdentifier.getIndex(null)};
}
- //get all indexes then flush everyone
- Timer.Context timeDeindex = batchTimer.time();
- Observable.from(indexes)
- .map(new Func1<String, Object>() {
- @Override
- public Object call(String index) {
- try {
- DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
- container.addOperation(builder);
- }catch (Exception e){
- log.error("failed to deindex",e);
- throw e;
- }
- return index;
- }
- }).toBlocking().last();
- timeDeindex.stop();
+
+ container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+
log.debug("Deindexed Entity with index id " + indexId);
return this;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a1a8ca7..7bdb41a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -400,7 +400,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
EntityIndexBatch batch = new EsEntityIndexBatchImpl(
- applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
+ applicationScope, indexBatchBufferProducer, config, this );
return batch;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 45c12a1..2342398 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -26,7 +26,6 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBufferConsumer;
-import org.apache.usergrid.persistence.index.IndexBufferProducer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.elasticsearch.action.ActionRequestBuilder;
@@ -37,7 +36,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
@@ -48,7 +46,6 @@ import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +85,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+
+ //name our thread so it's easy to see
+ Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+
List<IndexOperationMessage> drainList;
do {
try {
@@ -130,7 +131,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
while ( true );
}
- } ).subscribeOn( Schedulers.io() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+ } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( List<IndexOperationMessage> containerList ) {
@@ -157,37 +158,29 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
//process and flatten all the messages to builder requests
- Observable<ActionRequestBuilder> flattenMessages = Observable.from(operationMessages)
+ Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
.subscribeOn(Schedulers.io())
- .flatMap(new Func1<IndexOperationMessage, Observable<ActionRequestBuilder>>() {
+ .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
@Override
- public Observable<ActionRequestBuilder> call(IndexOperationMessage operationMessage) {
- return Observable.from(operationMessage.getOperations());
+ public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
+ return Observable.from( operationMessage.getOperations() );
}
- });
+ } );
//batch shard operations into a bulk request
flattenMessages
.buffer(config.getIndexBatchSize())
- .doOnNext(new Action1<List<ActionRequestBuilder>>() {
+ .doOnNext(new Action1<List<BatchRequest>>() {
@Override
- public void call(List<ActionRequestBuilder> builders) {
+ public void call(List<BatchRequest> builders) {
try {
final BulkRequestBuilder bulkRequest = initRequest();
- for (ActionRequestBuilder builder : builders) {
+ for (BatchRequest builder : builders) {
indexSizeCounter.dec();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
- if(builder instanceof DeleteByQueryRequestBuilder){
- DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = (DeleteByQueryRequestBuilder) builder;
- deleteByQueryRequestBuilder.get();
- }
+
+ builder.doOperation( client, bulkRequest );
}
sendRequest(bulkRequest);
}catch (Exception e){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
new file mode 100644
index 0000000..381d005
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Map;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class IndexRequest implements BatchRequest {
+
+ public final String writeAlias;
+ public final String entityType;
+ public final String documentId;
+
+ public final Map<String, Object> data;
+
+
+ public IndexRequest( final String writeAlias, final String entityType, final String documentId,
+ final Map<String, Object> data ) {
+ this.writeAlias = writeAlias;
+ this.entityType = entityType;
+ this.documentId = documentId;
+ this.data = data;
+ }
+
+
+ public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+ IndexRequestBuilder builder =
+ client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+
+
+ bulkRequest.add( builder );
+
+ }
+
+
+ public String getWriteAlias() {
+ return writeAlias;
+ }
+
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+
+ public Map<String, Object> getData() {
+ return data;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final IndexRequest that = ( IndexRequest ) o;
+
+ if ( !data.equals( that.data ) ) {
+ return false;
+ }
+ if ( !documentId.equals( that.documentId ) ) {
+ return false;
+ }
+ if ( !entityType.equals( that.entityType ) ) {
+ return false;
+ }
+ if ( !writeAlias.equals( that.writeAlias ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = writeAlias.hashCode();
+ result = 31 * result + entityType.hashCode();
+ result = 31 * result + documentId.hashCode();
+ result = 31 * result + data.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7e2312d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -23,7 +23,8 @@ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.TestModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
public class TestIndexModule extends TestModule {
@@ -37,7 +38,8 @@ public class TestIndexModule extends TestModule {
install( new IndexModule() {
@Override
public void wireBufferQueue() {
- bind( BufferQueue.class).to( BufferQueueInMemory.class );
+ bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+// bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 223860e..dd044d2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -29,8 +29,8 @@ public interface QueueManager {
/**
* Read messages from queue
* @param limit
- * @param transactionTimeout timeout in ms
- * @param waitTime wait time for next message in ms
+ * @param transactionTimeout timeout in seconds
+ * @param waitTime wait time for next message in milliseconds
* @param klass class to cast the return from
* @return List of Queue Messages
*/
[4/5] incubator-usergrid git commit: Changed consumer structure of
buffer/wait timeout. This how happens implicitly in our queue take,
and is no longer necessary. .
Posted by to...@apache.org.
Changed consumer structure of buffer/wait timeout. This how happens implicitly in our queue take, and is no longer necessary.
.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9111d944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9111d944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9111d944
Branch: refs/heads/USERGRID-466
Commit: 9111d94481bc15490b477f9ca48dd2565ca0e9dd
Parents: c47f32a
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 17:25:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 17:25:23 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 9 ++-
.../usergrid/persistence/index/IndexFig.java | 47 ++++++-----
.../index/IndexOperationMessage.java | 83 ++++++++++++--------
.../index/impl/BufferQueueInMemoryImpl.java | 5 +-
.../index/impl/BufferQueueSQSImpl.java | 54 +++++--------
.../persistence/index/impl/DeIndexRequest.java | 5 ++
.../index/impl/EsEntityIndexBatchImpl.java | 12 ++-
.../index/impl/EsIndexBufferConsumerImpl.java | 39 +++++----
.../index/impl/EsIndexBufferProducerImpl.java | 3 +-
.../persistence/index/impl/IndexRequest.java | 24 ++++--
.../impl/EntityConnectionIndexImplTest.java | 4 +-
.../queue/impl/SQSQueueManagerImpl.java | 8 +-
12 files changed, 174 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 8d99586..7b53d67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
import org.apache.usergrid.persistence.index.guice.IndexModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
import org.apache.usergrid.persistence.map.guice.MapModule;
import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.slf4j.Logger;
@@ -69,7 +71,12 @@ public class CoreModule extends AbstractModule {
install( new CommonModule());
install(new CollectionModule());
install(new GraphModule());
- install(new IndexModule());
+ install( new IndexModule() {
+ @Override
+ public void wireBufferQueue() {
+ bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
+ }
+ } );
install(new MapModule());
install(new QueueModule());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index ce14449..cde86fd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -18,6 +18,7 @@
*/
package org.apache.usergrid.persistence.index;
+
import org.safehaus.guicyfig.Default;
import org.safehaus.guicyfig.FigSingleton;
import org.safehaus.guicyfig.GuicyFig;
@@ -55,8 +56,16 @@ public interface IndexFig extends GuicyFig {
public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
+ /**
+ * Amount of time to wait when reading from the queue
+ */
public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+ /**
+ * Amount of time to wait when reading from the queue in milliseconds
+ */
+ public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
+
public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -67,9 +76,10 @@ public interface IndexFig extends GuicyFig {
public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
/**
- * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple backpressure
+ * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple
+ * backpressure
*/
- public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+ public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
@@ -82,7 +92,7 @@ public interface IndexFig extends GuicyFig {
int getPort();
@Default( "usergrid" )
- @Key( ELASTICSEARCH_CLUSTER_NAME)
+ @Key( ELASTICSEARCH_CLUSTER_NAME )
String getClusterName();
@Default( "usergrid" ) // no underbars allowed
@@ -111,15 +121,15 @@ public interface IndexFig extends GuicyFig {
public boolean isForcedRefresh();
/** Identify the client node with a unique name. */
- @Default("default")
+ @Default( "default" )
@Key( ELASTICSEARCH_NODENAME )
public String getNodeName();
- @Default("6")
+ @Default( "6" )
@Key( ELASTICSEARCH_NUMBER_OF_SHARDS )
public int getNumberOfShards();
- @Default("1")
+ @Default( "1" )
@Key( ELASTICSEARCH_NUMBER_OF_REPLICAS )
public int getNumberOfReplicas();
@@ -127,51 +137,48 @@ public interface IndexFig extends GuicyFig {
@Key( ELASTICSEARCH_FAIL_REFRESH )
int getFailRefreshCount();
- @Default("2")
+ @Default( "2" )
int getIndexCacheMaxWorkers();
/**
* how long to wait before the buffer flushes to send
- * @return
*/
- @Default("250")
+ @Default( "250" )
@Key( INDEX_BUFFER_TIMEOUT )
long getIndexBufferTimeout();
/**
* size of the buffer to build up before you send results
- * @return
*/
- @Default("1000")
+ @Default( "1000" )
@Key( INDEX_BUFFER_SIZE )
int getIndexBufferSize();
/**
* size of the buffer to build up before you send results
- * @return
*/
- @Default("1000")
+ @Default( "1000" )
@Key( INDEX_QUEUE_SIZE )
int getIndexQueueSize();
/**
* Request batch size for ES
- * @return
*/
- @Default("1000")
- @Key( INDEX_BATCH_SIZE)
+ @Default( "1000" )
+ @Key( INDEX_BATCH_SIZE )
int getIndexBatchSize();
- @Default("one")
+ @Default( "one" )
@Key( INDEX_WRITE_CONSISTENCY_LEVEL )
String getWriteConsistencyLevel();
- @Default("1000")
+ @Default( "1000" )
@Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
long getFailureRetryTime();
//give us 60 seconds to process the message
- @Default("60")
- @Key(INDEX_QUEUE_READ_TIMEOUT)
+ @Default( "60" )
+ @Key( INDEX_QUEUE_READ_TIMEOUT )
int getIndexQueueTimeout();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 43eaa01..7d8a859 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -16,69 +16,84 @@
*/
package org.apache.usergrid.persistence.index;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.impl.BatchRequest;
-
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import java.io.Serializable;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.DeIndexRequest;
+import org.apache.usergrid.persistence.index.impl.IndexRequest;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
/**
* Container for index operations.
*/
-public class IndexOperationMessage implements Serializable {
- private final Set<BatchRequest> builders;
+public class IndexOperationMessage implements Serializable {
+ private final Set<IndexRequest> indexRequests;
+ private final Set<DeIndexRequest> deIndexRequests;
+
+
+
private final BetterFuture<IndexOperationMessage> containerFuture;
- public IndexOperationMessage(){
+
+ public IndexOperationMessage() {
final IndexOperationMessage parent = this;
- this.builders = new HashSet<>();
- this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
+ this.indexRequests = new HashSet<>();
+ this.deIndexRequests = new HashSet<>();
+ this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() {
@Override
public IndexOperationMessage call() throws Exception {
return parent;
}
- });
+ } );
}
- /**
- * Add all our operations in the set
- * @param requests
- */
- public void setOperations(final Set<BatchRequest> requests){
- this.builders.addAll( requests);
+ public void addIndexRequest( final IndexRequest indexRequest ) {
+ indexRequests.add( indexRequest );
}
- /**
- * Add the operation to the set
- * @param builder
- */
- public void addOperation(BatchRequest builder){
- builders.add(builder);
+ public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
+ indexRequests.addAll( indexRequests );
}
- /**
- * return operations for the message
- * @return
- */
- public Set<BatchRequest> getOperations(){
- return builders;
+
+ public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
+ deIndexRequests.add( deIndexRequest );
+ }
+
+
+ public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
+ deIndexRequests.addAll( deIndexRequests );
+ }
+
+
+ public Set<IndexRequest> getIndexRequests() {
+ return indexRequests;
+ }
+
+
+ public Set<DeIndexRequest> getDeIndexRequests() {
+ return deIndexRequests;
+ }
+
+
+ @JsonIgnore
+ public boolean isEmpty(){
+ return indexRequests.isEmpty() && deIndexRequests.isEmpty();
}
/**
* return the promise
- * @return
*/
- public BetterFuture<IndexOperationMessage> getFuture(){
+ @JsonIgnore
+ public BetterFuture<IndexOperationMessage> getFuture() {
return containerFuture;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index ef0ef5f..1973e5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -60,8 +60,11 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
//loop until we're we're full or we time out
do {
try {
+
+ final long remaining = endTime - System.currentTimeMillis();
+
//we received 1, try to drain
- IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+ IndexOperationMessage polled = messages.poll( remaining, timeUnit );
//drain
if ( polled != null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index b814603..833e045 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -21,15 +21,11 @@ package org.apache.usergrid.persistence.index.impl;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.elasticsearch.action.ActionRequestBuilder;
-
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -39,7 +35,6 @@ import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -69,13 +64,10 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public void offer( final IndexOperationMessage operation ) {
- final Message toQueue = new Message( operation.getOperations() );
-
-
try {
- this.queue.sendMessage( toQueue );
+ this.queue.sendMessage( operation );
operation.getFuture().run();
}
catch ( IOException e ) {
@@ -87,19 +79,22 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
- //loop until we're we're full or we time out
+ //SQS doesn't support more than 10
+
+ final int actualTake = Math.min( 10, takeSize );
+
List<QueueMessage> messages = queue
- .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
- Message.class );
+ .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+ IndexOperationMessage.class );
final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
for ( final QueueMessage message : messages ) {
- SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+ final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
- operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+ SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message, messageBody );
response.add( operation );
}
@@ -111,10 +106,15 @@ public class BufferQueueSQSImpl implements BufferQueue {
@Override
public void ack( final List<IndexOperationMessage> messages ) {
+ //nothing to do
+ if(messages.size() == 0){
+ return;
+ }
+
List<QueueMessage> toAck = new ArrayList<>( messages.size() );
- for(IndexOperationMessage ioe: messages){
- toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+ for ( IndexOperationMessage ioe : messages ) {
+ toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
}
queue.commitMessages( toAck );
@@ -122,22 +122,6 @@ public class BufferQueueSQSImpl implements BufferQueue {
/**
- * The message to queue to SQS
- */
- public static final class Message implements Serializable {
- private final Set<BatchRequest> data;
-
-
- private Message( final Set<BatchRequest> data ) {this.data = data;}
-
-
- public Set<BatchRequest> getData() {
- return data;
- }
- }
-
-
- /**
* The message that subclasses our IndexOperationMessage. holds a pointer to the original message
*/
public class SqsIndexOperationMessage extends IndexOperationMessage {
@@ -145,7 +129,11 @@ public class BufferQueueSQSImpl implements BufferQueue {
private final QueueMessage message;
- public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+ public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) {
+ this.message = message;
+ this.addAllDeIndexRequest( source.getDeIndexRequests() );
+ this.addAllIndexRequest( source.getIndexRequests() );
+ }
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index a279f16..c63c4df 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -26,10 +26,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.Client;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
/**
* Represent the properties required to build an index request
*/
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
public class DeIndexRequest implements BatchRequest {
public final String[] indexes;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b0c731e..b63dfe6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
final String entityType = entity.getId().getType();
- container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+ container.addIndexRequest(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
return this;
}
@@ -168,7 +168,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
indexes = new String[]{indexIdentifier.getIndex(null)};
}
- container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+ container.addDeIndexRequest( new DeIndexRequest( indexes, entityType, indexId ) );
log.debug("Deindexed Entity with index id " + indexId);
@@ -192,6 +192,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public BetterFuture execute() {
IndexOperationMessage tempContainer = container;
container = new IndexOperationMessage();
+
+ /**
+ * No-op, just disregard it
+ */
+ if(tempContainer.isEmpty()){
+ return tempContainer.getFuture();
+ }
+
return indexBatchBufferProducer.put(tempContainer);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 2342398..8547889 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -82,9 +82,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
- this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
+ this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
@Override
- public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+ public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
//name our thread so it's easy to see
Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
@@ -99,21 +99,22 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
drainList = bufferQueue
.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
- for ( IndexOperationMessage drained : drainList ) {
- subscriber.onNext( drained );
- }
- bufferQueue.ack( drainList );
+ subscriber.onNext( drainList );
+
timer.stop();
countFail.set( 0 );
}
- catch( EsRejectedExecutionException err) {
+ catch ( EsRejectedExecutionException err ) {
countFail.incrementAndGet();
- log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+ log.error(
+ "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying. "
+ + "Failed {} consecutive times",
+ config.getFailRefreshCount(), countFail.get() );
- //es rejected the exception, sleep and retry in the queue
+ //es rejected the exception, sleep and retry in the queue
try {
Thread.sleep( config.getFailureRetryTime() );
}
@@ -131,8 +132,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
while ( true );
}
- } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
- config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+ } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
@Override
public void call( List<IndexOperationMessage> containerList ) {
if ( containerList.size() > 0 ) {
@@ -142,7 +142,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
time.stop();
}
}
- } );
+} )
+ //ack after we process
+ .doOnNext( new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+ bufferQueue.ack( indexOperationMessages );
+ }
+ } );
//start in the background
consumer.subscribe();
@@ -163,15 +170,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
@Override
public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
- return Observable.from( operationMessage.getOperations() );
+ final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
+ final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
+
+ return Observable.merge( deIndex, index );
}
} );
//batch shard operations into a bulk request
- flattenMessages
- .buffer(config.getIndexBatchSize())
+ flattenMessages.toList()
.doOnNext(new Action1<List<BatchRequest>>() {
@Override
public void call(List<BatchRequest> builders) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index db1f50e..61d5d25 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -54,7 +54,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
public BetterFuture put(IndexOperationMessage message){
Preconditions.checkNotNull(message, "Message cannot be null");
- indexSizeCounter.inc(message.getOperations().size());
+ indexSizeCounter.inc(message.getDeIndexRequests().size());
+ indexSizeCounter.inc(message.getIndexRequests().size());
Timer.Context time = timer.time();
bufferQueue.offer( message );
time.stop();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
index 381d005..4ec4092 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -26,17 +26,20 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
/**
* Represent the properties required to build an index request
*/
+@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
public class IndexRequest implements BatchRequest {
- public final String writeAlias;
- public final String entityType;
- public final String documentId;
+ public String writeAlias;
+ public String entityType;
+ public String documentId;
- public final Map<String, Object> data;
+ public Map<String, Object> data;
public IndexRequest( final String writeAlias, final String entityType, final String documentId,
@@ -48,13 +51,18 @@ public class IndexRequest implements BatchRequest {
}
- public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
- IndexRequestBuilder builder =
- client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+ /**
+ * DO NOT DELETE! Required for Jackson
+ */
+ public IndexRequest() {
+ }
- bulkRequest.add( builder );
+ public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) {
+ IndexRequestBuilder builder = client.prepareIndex( writeAlias, entityType, documentId ).setSource( data );
+
+ bulkRequest.add( builder );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 37b5e90..215ff57 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
EsTestUtils.waitForTasks(personLikesIndex);
- Thread.sleep( 1000 );
+ Thread.sleep( 30000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
personLikesIndex.refresh();
EsTestUtils.waitForTasks( personLikesIndex );
- Thread.sleep( 1000 );
+ Thread.sleep( 30000 );
// now, let's search for muffins
CandidateResults likes = personLikesIndex.search( searchScope,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..10aa621 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -95,7 +95,8 @@ public class SQSQueueManagerImpl implements QueueManager {
sqs.setRegion(region);
smileFactory.delegateToTextual(true);
mapper = new ObjectMapper( smileFactory );
- mapper.enable(SerializationFeature.INDENT_OUTPUT);
+ //pretty print, disabling for speed
+// mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
} catch ( Exception e ) {
LOG.warn("failed to setup SQS",e);
@@ -180,7 +181,10 @@ public class SQSQueueManagerImpl implements QueueManager {
}
String url = getQueue().getUrl();
LOG.info("Sending Message...{} to {}",body.toString(),url);
- SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+
+ final String stringBody = toString(body);
+
+ SendMessageRequest request = new SendMessageRequest(url, stringBody);
sqs.sendMessage(request);
}