You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/14 18:54:02 UTC
[06/39] usergrid git commit: remove threading
remove threading
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2a168278
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2a168278
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2a168278
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 2a16827831f65b61c7de25be262551fe87928425
Parents: 805113c
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 15 13:45:08 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Sep 24 15:00:45 2015 -0600
----------------------------------------------------------------------
.../core/future/FutureObservable.java | 44 -------------
.../persistence/index/EntityIndexBatch.java | 1 -
.../index/impl/EsIndexBufferConsumerImpl.java | 65 +-------------------
.../index/impl/IndexOperationMessage.java | 18 ------
4 files changed, 1 insertion(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
deleted file mode 100644
index 06eed4d..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-
-import rx.Observable;
-
-import java.util.concurrent.FutureTask;
-
-
-/**
- * Future without the exception nastiness
- */
-public class FutureObservable<T> {
-
- private final FutureTask<T> future;
-
-
- public FutureObservable(final T returnVal) {
- future = new FutureTask<>( () -> returnVal );
- }
-
- public void done() {
- future.run();
- }
-
- public Observable<T> observable() {
- return Observable.from(future);
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 85b234a..d1b076d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.index;/*
import java.util.UUID;
-import org.apache.usergrid.persistence.core.future.FutureObservable;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 91fab2f..93c0d85 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -33,7 +33,6 @@ import org.elasticsearch.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.future.FutureObservable;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -62,8 +61,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
private final Timer flushTimer;
private final IndexFig indexFig;
private final Counter indexSizeCounter;
- private final Timer offerTimer;
- private final BufferProducer bufferProducer;
private final Histogram roundtripTimer;
private final Timer indexTimer;
@@ -76,7 +73,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
final MetricsFactory metricsFactory, final IndexFig indexFig ) {
this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.flush");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index_buffer.size");
- this.offerTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index_buffer.producer");
this.roundtripTimer = metricsFactory.getHistogram(EsIndexBufferConsumerImpl.class, "index_buffer.message_cycle");
//wire up the gauge of inflight messages
@@ -90,11 +86,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
this.client = provider.getClient();
this.indexFig = indexFig;
- this.bufferProducer = new BufferProducer();
//batch up sets of some size and send them in batch
- startSubscription();
}
@@ -102,34 +96,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc(message.getDeIndexRequests().size());
indexSizeCounter.inc(message.getIndexRequests().size());
- Timer.Context time = offerTimer.time();
- bufferProducer.send(message);
- time.stop();
- return message.observable();
- }
-
-
- /**
- * Start the subscription
- */
- private void startSubscription() {
-
- //buffer on our new thread with a timeout
- final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer);
- observable.subscribeOn(Schedulers.io()).flatMap(indexOpBuffer -> {
-
- //hand off to processor in new observable thread so we can continue to buffer faster
- return Observable.just(indexOpBuffer).flatMap(
- indexOpBufferObservable -> ObservableTimer.time(processBatch(indexOpBufferObservable), flushTimer)
- )
-
- //use the I/O scheduler for thread re-use and efficiency in context switching then use our concurrent
- // flatmap count or higher throughput of batches once buffered
- .subscribeOn(Schedulers.io());
- }, indexFig.getIndexFlushWorkerCount())
- //start in the background
- .subscribe();
+ return processBatch(message);
}
@@ -140,10 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
*/
private Observable<IndexOperationMessage> processBatch( final IndexOperationMessage batch ) {
-
//take our stream of batches, then stream then into individual ops for consumption on ES
-
-
final Set<IndexOperation> indexOperationSet = batch.getIndexRequests();
final Set<DeIndexOperation> deIndexOperationSet = batch.getDeIndexRequests();
@@ -186,7 +151,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//subscribe to the operations that generate requests on a new thread so that we can execute them quickly
//mark this as done
return processedIndexOperations.doOnNext(processedIndexOp -> {
- processedIndexOp.done();
roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
});
}
@@ -251,31 +215,4 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
+ "entries" );
}
}
-
-
- public static class BufferProducer implements Observable.OnSubscribe<IndexOperationMessage> {
-
- private Subscriber<? super IndexOperationMessage> subscriber;
-
-
- /**
- * Send the data through the buffer
- */
- public void send( final IndexOperationMessage indexOps ) {
- try {
- subscriber.onNext( indexOps );
- }catch(Exception e){
- //re-throws so the caller can determine failover
- log.error( "Unable to process message for indexOp {}, error follows.", indexOps, e );
- throw e;
- }
- }
-
-
- @Override
- public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
- //just assigns for later use, doesn't do anything else
- this.subscriber = subscriber;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2a168278/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 0a49626..bd2bec8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -24,10 +24,7 @@ import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
-import org.apache.usergrid.persistence.core.future.FutureObservable;
-
import com.fasterxml.jackson.annotation.JsonIgnore;
-import rx.Observable;
/**
@@ -40,13 +37,11 @@ public class IndexOperationMessage implements Serializable {
private long creationTime;
- private final FutureObservable<IndexOperationMessage> containerFuture;
public IndexOperationMessage() {
this.indexRequests = new HashSet<>();
this.deIndexRequests = new HashSet<>();
- this.containerFuture = new FutureObservable<>( this );
this.creationTime = System.currentTimeMillis();
}
@@ -78,15 +73,6 @@ public class IndexOperationMessage implements Serializable {
return indexRequests.isEmpty() && deIndexRequests.isEmpty();
}
- /**
- * return the promise
- */
- @JsonIgnore
- public Observable<IndexOperationMessage> observable() {
- return containerFuture.observable();
- }
-
-
@Override
public boolean equals( final Object o ) {
if ( this == o ) {
@@ -116,10 +102,6 @@ public class IndexOperationMessage implements Serializable {
return result;
}
- public void done() {
- //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
- containerFuture.done();
- }
public long getCreationTime() {
return creationTime;