You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/02/26 00:17:06 UTC

[09/23] incubator-usergrid git commit: removing future, moving around some initialization code

removing future, moving around some initialization code


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3231534d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3231534d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3231534d

Branch: refs/heads/USERGRID-409
Commit: 3231534d75544779074e987bf2184028cb1c5dbd
Parents: 3f637da
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 24 16:02:11 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 24 16:02:11 2015 -0700

----------------------------------------------------------------------
 .../core/future/ObservableFuture.java           | 42 ---------------
 .../index/impl/EsEntityIndexBatchImpl.java      | 54 +++++++++++---------
 .../index/impl/IndexBatchBufferImpl.java        |  7 +--
 3 files changed, 35 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
deleted file mode 100644
index d08dd92..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-import rx.Observable;
-import rx.Subscriber;
-
-/**
- *
- */
-public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
-
-    private Subscriber<? super T> subscriber;
-
-    @Override
-    public void call(Subscriber<? super T> subscriber) {
-        this.subscriber = subscriber;
-    }
-
-    public void done(T t){
-        this.subscriber.onCompleted();
-    }
-
-    public void emit(T t){
-        this.subscriber.onNext(t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 896c038..e008707 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.util.concurrent.Futures;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.index.*;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -104,6 +105,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
+        //constrained
         this.promises = new ConcurrentLinkedQueue<>();
     }
 
@@ -147,10 +149,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public EntityIndexBatch deindex( final IndexScope indexScope, final Id id, final UUID version) {
 
         IndexValidationUtils.validateIndexScope( indexScope );
-        ValidationUtils.verifyIdentity( id );
+        ValidationUtils.verifyIdentity(id);
         ValidationUtils.verifyVersion( version );
 
-        final String context = createContextName( indexScope );
+        final String context = createContextName(indexScope);
         final String entityType = id.getType();
 
         final String indexId = createIndexDocId( id, version, context );
@@ -193,7 +195,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
                    }
                }).toBlocking().last();
 
-        log.debug( "Deindexed Entity with index id " + indexId );
+        log.debug("Deindexed Entity with index id " + indexId);
 
 
         return this;
@@ -213,35 +215,41 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         return deindex( indexScope, entity.getId(), entity.getVersion() );
     }
 
-
-
     @Override
     public void execute() {
-//        indexBatchBuffer.flush();
-        Observable.from(promises)
-                .doOnNext(new Action1<BetterFuture>() {
-                    @Override
-                    public void call(BetterFuture betterFuture) {
-                        betterFuture.get();
-                    }
-                }).toBlocking().lastOrDefault(null);
-        promises.clear();
+       flushFutures();
     }
 
-
-
     @Override
     public void executeAndRefresh() {
-//        indexBatchBuffer.flushAndRefresh();
-        Iterator<BetterFuture> iterator = promises.iterator();
-        while(iterator.hasNext()){
-            iterator.next().get();
-        }
-        promises.clear();
+        flushFutures();
         entityIndex.refresh();
-
     }
 
+    private void flushFutures() {
+        ObservableIterator<BetterFuture> iterator = new ObservableIterator<BetterFuture>("futures") {
+            @Override
+            protected Iterator<BetterFuture> getIterator() {
+                return promises.iterator();
+            }
+        };
+        Observable.create(iterator)
+                .doOnNext(new Action1<BetterFuture>() {
+                    @Override
+                    public void call(BetterFuture betterFuture) {
+                        betterFuture.get();
+                    }
+                })
+                .buffer(100)
+                .doOnNext(new Action1<List<BetterFuture>>() {
+                    @Override
+                    public void call(List<BetterFuture> betterFutures) {
+                        promises.removeAll(betterFutures);
+                    }
+                })
+                .toBlocking()
+                .lastOrDefault(null);
+    }
 
     /**
      * Set the entity as a map with the context

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3231534d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index d7e6bf1..ad14920 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -59,6 +59,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     private final IndexFig config;
     private final Timer flushTimer;
     private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
+    private final Counter bufferCounter;
     private Observable<List<RequestBuilderContainer>> consumer;
     private Producer producer;
 
@@ -72,7 +73,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.producer = new Producer();
         this.client = provider.getClient();
-
+        bufferCounter = metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size");
         consumer();
     }
 
@@ -97,7 +98,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     @Override
     public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
         RequestBuilderContainer container = new RequestBuilderContainer(builder);
-        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
+        bufferCounter.inc();
         producer.put(container);
         return container.getFuture();
     }
@@ -105,7 +106,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
     @Override
     public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
         RequestBuilderContainer container = new RequestBuilderContainer(builder);
-        metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
+        bufferCounter.inc();
         producer.put(container);
         return container.getFuture();
     }