You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/14 18:54:02 UTC

[06/39] usergrid git commit: remove threading

remove threading


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a168278
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a168278
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a168278

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 2a16827831f65b61c7de25be262551fe87928425
Parents: 805113c
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:45:08 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:45 2015 -0600

----------------------------------------------------------------------
 .../core/future/FutureObservable.java           | 44 -------------
 .../persistence/index/EntityIndexBatch.java     |  1 -
 .../index/impl/EsIndexBufferConsumerImpl.java   | 65 +-------------------
 .../index/impl/IndexOperationMessage.java       | 18 ------
 4 files changed, 1 insertion(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
deleted file mode 100644
index 06eed4d..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-
-import rx.Observable;
-
-import java.util.concurrent.FutureTask;
-
-
-/**
- * Future without the exception nastiness
- */
-public class FutureObservable<T> {
-
-    private final FutureTask<T> future;
-
-
-    public FutureObservable(final T returnVal) {
-        future = new FutureTask<>( () -> returnVal );
-    }
-
-    public void done() {
-        future.run();
-    }
-
-    public Observable<T> observable() {
-        return  Observable.from(future);
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 85b234a..d1b076d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.index;/*
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 91fab2f..93c0d85 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -33,7 +33,6 @@ import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 
@@ -62,8 +61,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final Timer flushTimer;
     private final IndexFig indexFig;
     private final Counter indexSizeCounter;
-    private final Timer offerTimer;
-    private final BufferProducer bufferProducer;
     private final Histogram roundtripTimer;
     private final Timer indexTimer;
 
@@ -76,7 +73,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                       final MetricsFactory metricsFactory, final IndexFig indexFig ) {
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
         this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
-        this.offerTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.producer");
         this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
 
         //wire up the gauge of inflight messages
@@ -90,11 +86,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.client = provider.getClient();
         this.indexFig = indexFig;
 
-        this.bufferProducer = new BufferProducer();
 
         //batch up sets of some size and send them in batch
 
-        startSubscription();
     }
 
 
@@ -102,34 +96,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc(message.getDeIndexRequests().size());
         indexSizeCounter.inc(message.getIndexRequests().size());
-        Timer.Context time = offerTimer.time();
-        bufferProducer.send(message);
-        time.stop();
-        return  message.observable();
-    }
-
-
-    /**
-     * Start the subscription
-     */
-    private void startSubscription() {
-
-        //buffer on our new thread with a timeout
-        final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
 
-        observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
-
-            //hand off to processor in new observable thread so we can continue to buffer faster
-            return Observable.just(indexOpBuffer).flatMap(
-                indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
-            )
-
-                //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
-                // flatmap count or higher throughput of batches once buffered
-                .subscribeOn(Schedulers.io());
-        }, indexFig.getIndexFlushWorkerCount())
-            //start in the background
-            .subscribe();
+        return  processBatch(message);
     }
 
 
@@ -140,10 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
      */
     private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
 
-
         //take our stream of batches, then stream then into individual ops for consumption on ES
-
-
         final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
         final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
 
@@ -186,7 +151,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
         //mark this as done
         return processedIndexOperations.doOnNext(processedIndexOp -> {
-            processedIndexOp.done();
             roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
         });
     }
@@ -251,31 +215,4 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     + "entries" );
         }
     }
-
-
-    public static class BufferProducer implements Observable.OnSubscribe<IndexOperationMessage> {
-
-        private Subscriber<? super IndexOperationMessage> subscriber;
-
-
-        /**
-         * Send the data through the buffer
-         */
-        public void send( final IndexOperationMessage indexOps ) {
-            try {
-                subscriber.onNext( indexOps );
-            }catch(Exception e){
-                //re-throws so the caller can determine failover
-                log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
-                throw e;
-            }
-        }
-
-
-        @Override
-        public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
-            //just assigns for later use, doesn't do anything else
-            this.subscriber = subscriber;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 0a49626..bd2bec8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -24,10 +24,7 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.usergrid.persistence.core.future.FutureObservable;
-
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import rx.Observable;
 
 
 /**
@@ -40,13 +37,11 @@ public class IndexOperationMessage implements Serializable {
     private long creationTime;
 
 
-    private final FutureObservable<IndexOperationMessage> containerFuture;
 
 
     public IndexOperationMessage() {
         this.indexRequests = new HashSet<>();
         this.deIndexRequests = new HashSet<>();
-        this.containerFuture = new FutureObservable<>( this );
         this.creationTime = System.currentTimeMillis();
     }
 
@@ -78,15 +73,6 @@ public class IndexOperationMessage implements Serializable {
         return indexRequests.isEmpty() && deIndexRequests.isEmpty();
     }
 
-    /**
-     * return the promise
-     */
-    @JsonIgnore
-    public Observable<IndexOperationMessage> observable() {
-        return containerFuture.observable();
-    }
-
-
     @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
@@ -116,10 +102,6 @@ public class IndexOperationMessage implements Serializable {
         return result;
     }
 
-    public void done() {
-        //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
-        containerFuture.done();
-    }
 
     public long getCreationTime() {
         return creationTime;