You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:31 UTC

[03/50] incubator-usergrid git commit: First pass and refactoring Queue between producer and consumer.

First pass and refactoring Queue between producer and consumer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b9d17b89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b9d17b89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b9d17b89

Branch: refs/heads/two-dot-o
Commit: b9d17b89bb406896ca4f8c1e67055572c7be0d2c
Parents: 06e7ad6
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 13:40:29 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 13:40:29 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/IndexBufferProducer.java  |  1 -
 .../persistence/index/impl/BufferQueue.java     | 49 +++++++++++
 .../index/impl/BufferQueueInMemory.java         | 85 ++++++++++++++++++
 .../index/impl/EsEntityIndexImpl.java           |  1 +
 .../index/impl/EsIndexBufferConsumerImpl.java   | 93 ++++++++++----------
 .../index/impl/EsIndexBufferProducerImpl.java   | 13 ++-
 6 files changed, 186 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
index 19d224c..7c8b7e6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferProducer.java
@@ -34,5 +34,4 @@ public interface IndexBufferProducer {
 
     BetterFuture put(IndexOperationMessage message);
 
-    BlockingQueue<IndexOperationMessage> getSource();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
new file mode 100644
index 0000000..dec6ac3
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+
+/**
+ * A temporary interface of our buffer Q to decouple of producer and consumer;
+ */
+public interface BufferQueue {
+
+    /**
+     * Offer the indexoperation message
+     * @param operation
+     */
+    public void offer(final IndexOperationMessage operation);
+
+
+    /**
+     * Perform a take, potentially blocking.  Until takesize is available, or timeout has elapsed
+     * @param takeSize
+     * @param timeout
+     * @param timeUnit
+     * @return
+     */
+    public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
new file mode 100644
index 0000000..502f45d
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemory implements BufferQueue {
+
+    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+    private final IndexFig fig;
+
+
+    @Inject
+    public BufferQueueInMemory( final ArrayBlockingQueue<IndexOperationMessage> messages, final IndexFig fig ) {
+        this.messages = messages;
+
+
+        this.fig = fig;
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        messages.offer( operation );
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+
+        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+        //loop until we're we're full or we time out
+        do {
+            try {
+                //we received 1, try to drain
+                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+                //drain
+                if ( polled != null ) {
+                    response.add( polled );
+                    messages.drainTo( response, takeSize - response.size() );
+                }
+            }
+            catch ( InterruptedException ie ) {
+                //swallow
+
+            }
+        }
+        while ( response.size() < takeSize &&  System.currentTimeMillis() < endTime );
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index c92b299..72618b4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -550,6 +550,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
         if ( candidates.size() >= query.getLimit() ) {
             //USERGRID-461 our cursor is getting too large, map it to a new time UUID
+            //TODO T.N., this shouldn't live here. This should live at the UG core tier.  However the RM/EM are an absolute mess, so until they're refactored, this is it's home
 
             final String userCursorString = org.apache.usergrid.persistence.index.utils.StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 94ea71e..19b8438 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -67,9 +67,13 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Counter indexSizeCounter;
     private final Meter flushMeter;
     private final Timer produceTimer;
+    private final BufferQueue bufferQueue;
 
     @Inject
-    public EsIndexBufferConsumerImpl(final IndexFig config, final IndexBufferProducer producer, final EsProvider provider, final MetricsFactory metricsFactory){
+    public EsIndexBufferConsumerImpl( final IndexFig config, final IndexBufferProducer producer, final EsProvider
+        provider, final MetricsFactory metricsFactory,
+                                      final BufferQueue bufferQueue ){
+        this.bufferQueue = bufferQueue;
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
         this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -77,61 +81,56 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
-        final BlockingQueue<IndexOperationMessage> producerQueue = producer.getSource();
 
 
         final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create(new Observable.OnSubscribe<IndexOperationMessage>() {
+        this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
             @Override
-            public void call(final Subscriber<? super IndexOperationMessage> subscriber) {
-                Thread thread = new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        List<IndexOperationMessage> drainList = new ArrayList<>(config.getIndexBufferSize() + 1);
-                        do {
-                            try {
-                                IndexOperationMessage polled = producerQueue.poll(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS);
-                                if(polled!=null) {
-                                    Timer.Context timer = produceTimer.time();
-                                    drainList.add(polled);
-                                    producerQueue.drainTo(drainList, config.getIndexBufferSize());
-                                    for(IndexOperationMessage drained : drainList){
-                                        subscriber.onNext(drained);
-                                    }
-                                    drainList.clear();
-                                    timer.stop();
-                                }
-                                countFail.set(0);
-                            } catch (InterruptedException ie) {
-                                int count = countFail.incrementAndGet();
-                                log.error("failed to dequeue", ie);
-                                if(count > 200){
-                                    log.error("Shutting down index drain due to repetitive failures");
-                                    //break;
-                                }
+            public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+                List<IndexOperationMessage> drainList;
+                do {
+                    try {
 
-                            }
-                        } while (true);
+
+                        Timer.Context timer = produceTimer.time();
+
+                        drainList = bufferQueue
+                            .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
+
+                        for ( IndexOperationMessage drained : drainList ) {
+                            subscriber.onNext( drained );
+                        }
+                        drainList.clear();
+                        timer.stop();
+
+                        countFail.set( 0 );
                     }
-                });
-                thread.setName("EsEntityIndex_Consumer");
-                thread.start();
-            }
-        })
-            .subscribeOn(Schedulers.io())
-            .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
-            .doOnNext(new Action1<List<IndexOperationMessage>>() {
-                @Override
-                public void call(List<IndexOperationMessage> containerList) {
-                    if (containerList.size() > 0) {
-                        flushMeter.mark(containerList.size());
-                        Timer.Context time = flushTimer.time();
-                        execute(containerList);
-                        time.stop();
+                    catch ( Exception e ) {
+                        int count = countFail.incrementAndGet();
+                        log.error( "failed to dequeue", e );
+                        if ( count > 200 ) {
+                            log.error( "Shutting down index drain due to repetitive failures" );
+                            //break;
+                        }
                     }
                 }
-            });
+                while ( true );
+            }
+        } ).subscribeOn( Schedulers.io() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+            config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+            @Override
+            public void call( List<IndexOperationMessage> containerList ) {
+                if ( containerList.size() > 0 ) {
+                    flushMeter.mark( containerList.size() );
+                    Timer.Context time = flushTimer.time();
+                    execute( containerList );
+                    time.stop();
+                }
+            }
+        } );
+
+        //start in the background
         consumer.subscribe();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b9d17b89/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index d4d621f..f9999b2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -41,12 +41,13 @@ import java.util.concurrent.BlockingQueue;
 public class EsIndexBufferProducerImpl implements IndexBufferProducer {
 
     private final Counter indexSizeCounter;
-    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
     private final Timer timer;
+    private final BufferQueue bufferQueue;
 
     @Inject
-    public EsIndexBufferProducerImpl(MetricsFactory metricsFactory,IndexFig fig){
-        this.messages = new ArrayBlockingQueue<>(fig.getIndexQueueSize()*5);
+    public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, IndexFig fig, final BufferQueue bufferQueue ){
+        this.bufferQueue = bufferQueue;
         this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
         this.timer =  metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
     }
@@ -55,13 +56,9 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
         Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc(message.getOperations().size());
         Timer.Context time = timer.time();
-        messages.offer(message);
+        bufferQueue.offer( message );
         time.stop();
         return message.getFuture();
     }
 
-    @Override
-    public BlockingQueue<IndexOperationMessage> getSource() {
-        return messages;
-    }
 }