You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/02/26 00:17:06 UTC
[09/23] incubator-usergrid git commit: removing future,
moving around some initialization code
removing future, moving around some initialization code
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3231534d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3231534d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3231534d
Branch: refs/heads/USERGRID-409
Commit: 3231534d75544779074e987bf2184028cb1c5dbd
Parents: 3f637da
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 24 16:02:11 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 24 16:02:11 2015 -0700
----------------------------------------------------------------------
.../core/future/ObservableFuture.java | 42 ---------------
.../index/impl/EsEntityIndexBatchImpl.java | 54 +++++++++++---------
.../index/impl/IndexBatchBufferImpl.java | 7 +--
3 files changed, 35 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
deleted file mode 100644
index d08dd92..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-import rx.Observable;
-import rx.Subscriber;
-
-/**
- *
- */
-public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
-
- private Subscriber<? super T> subscriber;
-
- @Override
- public void call(Subscriber<? super T> subscriber) {
- this.subscriber = subscriber;
- }
-
- public void done(T t){
- this.subscriber.onCompleted();
- }
-
- public void emit(T t){
- this.subscriber.onNext(t);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 896c038..e008707 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.Futures;
import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.index.*;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -104,6 +105,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
+ //constrained
this.promises = new ConcurrentLinkedQueue<>();
}
@@ -147,10 +149,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public EntityIndexBatch deindex( final IndexScope indexScope, final Id id, final UUID version) {
IndexValidationUtils.validateIndexScope( indexScope );
- ValidationUtils.verifyIdentity( id );
+ ValidationUtils.verifyIdentity(id);
ValidationUtils.verifyVersion( version );
- final String context = createContextName( indexScope );
+ final String context = createContextName(indexScope);
final String entityType = id.getType();
final String indexId = createIndexDocId( id, version, context );
@@ -193,7 +195,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
}
}).toBlocking().last();
- log.debug( "Deindexed Entity with index id " + indexId );
+ log.debug("Deindexed Entity with index id " + indexId);
return this;
@@ -213,35 +215,41 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
return deindex( indexScope, entity.getId(), entity.getVersion() );
}
-
-
@Override
public void execute() {
-// indexBatchBuffer.flush();
- Observable.from(promises)
- .doOnNext(new Action1<BetterFuture>() {
- @Override
- public void call(BetterFuture betterFuture) {
- betterFuture.get();
- }
- }).toBlocking().lastOrDefault(null);
- promises.clear();
+ flushFutures();
}
-
-
@Override
public void executeAndRefresh() {
-// indexBatchBuffer.flushAndRefresh();
- Iterator<BetterFuture> iterator = promises.iterator();
- while(iterator.hasNext()){
- iterator.next().get();
- }
- promises.clear();
+ flushFutures();
entityIndex.refresh();
-
}
+ private void flushFutures() {
+ ObservableIterator<BetterFuture> iterator = new ObservableIterator<BetterFuture>("futures") {
+ @Override
+ protected Iterator<BetterFuture> getIterator() {
+ return promises.iterator();
+ }
+ };
+ Observable.create(iterator)
+ .doOnNext(new Action1<BetterFuture>() {
+ @Override
+ public void call(BetterFuture betterFuture) {
+ betterFuture.get();
+ }
+ })
+ .buffer(100)
+ .doOnNext(new Action1<List<BetterFuture>>() {
+ @Override
+ public void call(List<BetterFuture> betterFutures) {
+ promises.removeAll(betterFutures);
+ }
+ })
+ .toBlocking()
+ .lastOrDefault(null);
+ }
/**
* Set the entity as a map with the context
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index d7e6bf1..ad14920 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -59,6 +59,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
private final IndexFig config;
private final Timer flushTimer;
private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
+ private final Counter bufferCounter;
private Observable<List<RequestBuilderContainer>> consumer;
private Producer producer;
@@ -72,7 +73,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
this.failureMonitor = new FailureMonitorImpl(config,provider);
this.producer = new Producer();
this.client = provider.getClient();
-
+ bufferCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size");
consumer();
}
@@ -97,7 +98,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
@Override
public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
RequestBuilderContainer container = new RequestBuilderContainer(builder);
- metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
+ bufferCounter.inc();
producer.put(container);
return container.getFuture();
}
@@ -105,7 +106,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
@Override
public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
RequestBuilderContainer container = new RequestBuilderContainer(builder);
- metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
+ bufferCounter.inc();
producer.put(container);
return container.getFuture();
}