You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/10 20:40:32 UTC
incubator-usergrid git commit: First pass and refactoring Queue
between producer and consumer.
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-466 [created] b9d17b89b
First pass and refactoring Queue between producer and consumer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b9d17b89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b9d17b89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b9d17b89
Branch: refs/heads/USERGRID-466
Commit: b9d17b89bb406896ca4f8c1e67055572c7be0d2c
Parents: 06e7ad6
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 13:40:29 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 13:40:29 2015 -0600
----------------------------------------------------------------------
.../persistence/index/IndexBufferProducer.java | 1 -
.../persistence/index/impl/BufferQueue.java | 49 +++++++++++
.../index/impl/BufferQueueInMemory.java | 85 ++++++++++++++++++
.../index/impl/EsEntityIndexImpl.java | 1 +
.../index/impl/EsIndexBufferConsumerImpl.java | 93 ++++++++++----------
.../index/impl/EsIndexBufferProducerImpl.java | 13 ++-
6 files changed, 186 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
index 19d224c..7c8b7e6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -34,5 +34,4 @@ public interface IndexBufferProducer {
BetterFuture put(IndexOperationMessage message);
- BlockingQueue<IndexOperationMessage> getSource();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
new file mode 100644
index 0000000..dec6ac3
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+
+/**
+ * A temporary interface of our buffer Q to decouple of producer and consumer;
+ */
+public interface BufferQueue {
+
+ /**
+ * Offer the indexoperation message
+ * @param operation
+ */
+ public void offer(final IndexOperationMessage operation);
+
+
+ /**
+ * Perform a take, potentially blocking. Until takesize is available, or timeout has elapsed
+ * @param takeSize
+ * @param timeout
+ * @param timeUnit
+ * @return
+ */
+ public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
new file mode 100644
index 0000000..502f45d
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemory implements BufferQueue {
+
+ private final ArrayBlockingQueue<IndexOperationMessage> messages;
+ private final IndexFig fig;
+
+
+ @Inject
+ public BufferQueueInMemory( final ArrayBlockingQueue<IndexOperationMessage> messages, final IndexFig fig ) {
+ this.messages = messages;
+
+
+ this.fig = fig;
+ }
+
+
+ @Override
+ public void offer( final IndexOperationMessage operation ) {
+ messages.offer( operation );
+ }
+
+
+ @Override
+ public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+ final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+
+ final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+ //loop until we're we're full or we time out
+ do {
+ try {
+ //we received 1, try to drain
+ IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+ //drain
+ if ( polled != null ) {
+ response.add( polled );
+ messages.drainTo( response, takeSize - response.size() );
+ }
+ }
+ catch ( InterruptedException ie ) {
+ //swallow
+
+ }
+ }
+ while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index c92b299..72618b4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -550,6 +550,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
if ( candidates.size() >= query.getLimit() ) {
//USERGRID-461 our cursor is getting too large, map it to a new time UUID
+ //TODO T.N., this shouldn't live here. This should live at the UG core tier. However the RM/EM are an absolute mess, so until they're refactored, this is it's home
final String userCursorString = org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 94ea71e..19b8438 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -67,9 +67,13 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Counter indexSizeCounter;
private final Meter flushMeter;
private final Timer produceTimer;
+ private final BufferQueue bufferQueue;
@Inject
- public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
+ public EsIndexBufferConsumerImpl( final IndexFig config, final IndexBufferProducer producer, final EsProvider
+ provider, final MetricsFactory metricsFactory,
+ final BufferQueue bufferQueue ){
+ this.bufferQueue = bufferQueue;
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -77,61 +81,56 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.client = provider.getClient();
this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
- final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
final AtomicInteger countFail = new AtomicInteger();
//batch up sets of some size and send them in batch
- this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
+ this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
@Override
- public void call(final Subscriber<? super IndexOperationMessage> subscriber) {
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
- do {
- try {
- IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
- if(polled!=null) {
- Timer.Context timer = produceTimer.time();
- drainList.add(polled);
- producerQueue.drainTo(drainList, config.getIndexBufferSize());
- for(IndexOperationMessage drained : drainList){
- subscriber.onNext(drained);
- }
- drainList.clear();
- timer.stop();
- }
- countFail.set(0);
- } catch (InterruptedException ie) {
- int count = countFail.incrementAndGet();
- log.error("failed to dequeue", ie);
- if(count > 200){
- log.error("Shutting down index drain due to repetitive failures");
- //break;
- }
+ public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+ List<IndexOperationMessage> drainList;
+ do {
+ try {
- }
- } while (true);
+
+ Timer.Context timer = produceTimer.time();
+
+ drainList = bufferQueue
+ .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
+
+ for ( IndexOperationMessage drained : drainList ) {
+ subscriber.onNext( drained );
+ }
+ drainList.clear();
+ timer.stop();
+
+ countFail.set( 0 );
}
- });
- thread.setName("EsEntityIndex_Consumer");
- thread.start();
- }
- })
- .subscribeOn(Schedulers.io())
- .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
- .doOnNext(new Action1<List<IndexOperationMessage>>() {
- @Override
- public void call(List<IndexOperationMessage> containerList) {
- if (containerList.size() > 0) {
- flushMeter.mark(containerList.size());
- Timer.Context time = flushTimer.time();
- execute(containerList);
- time.stop();
+ catch ( Exception e ) {
+ int count = countFail.incrementAndGet();
+ log.error( "failed to dequeue", e );
+ if ( count > 200 ) {
+ log.error( "Shutting down index drain due to repetitive failures" );
+ //break;
+ }
}
}
- });
+ while ( true );
+ }
+ } ).subscribeOn( Schedulers.io() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+ config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+ @Override
+ public void call( List<IndexOperationMessage> containerList ) {
+ if ( containerList.size() > 0 ) {
+ flushMeter.mark( containerList.size() );
+ Timer.Context time = flushTimer.time();
+ execute( containerList );
+ time.stop();
+ }
+ }
+ } );
+
+ //start in the background
consumer.subscribe();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index d4d621f..f9999b2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -41,12 +41,13 @@ import java.util.concurrent.BlockingQueue;
public class EsIndexBufferProducerImpl implements IndexBufferProducer {
private final Counter indexSizeCounter;
- private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
private final Timer timer;
+ private final BufferQueue bufferQueue;
@Inject
- public EsIndexBufferProducerImpl(MetricsFactory metricsFactory,IndexFig fig){
- this.messages = new ArrayBlockingQueue<>(fig.getIndexQueueSize()*5);
+ public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, IndexFig fig, final BufferQueue bufferQueue ){
+ this.bufferQueue = bufferQueue;
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
this.timer = metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
}
@@ -55,13 +56,9 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getOperations().size());
Timer.Context time = timer.time();
- messages.offer(message);
+ bufferQueue.offer( message );
time.stop();
return message.getFuture();
}
- @Override
- public BlockingQueue<IndexOperationMessage> getSource() {
- return messages;
- }
}