You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 01:21:53 UTC
[05/27] incubator-usergrid git commit: add comments
add comments
removing metrics factory
remove synchronization
moving towards futures
moving towards futures
Adding Batch Buffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/090eb1bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/090eb1bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/090eb1bc
Branch: refs/heads/USERGRID-280
Commit: 090eb1bc9884dabee1ab2a4f12b889b1ac84aa6d
Parents: bf63116
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 17 17:10:28 2015 -0800
Committer: Shawn Feldman <sf...@apache.org>
Committed: Sat Feb 21 08:39:21 2015 -0500
----------------------------------------------------------------------
.../batch/service/JobSchedulerService.java | 2 +-
.../apache/usergrid/metrics/MetricsFactory.java | 113 ---------
.../main/resources/usergrid-core-context.xml | 4 +-
.../persistence/core/future/BetterFuture.java | 42 ++++
.../core/future/ObservableFuture.java | 42 ++++
.../persistence/index/EntityIndexBatch.java | 26 ++-
.../persistence/index/IndexBatchBuffer.java | 28 +--
.../usergrid/persistence/index/IndexFig.java | 24 +-
.../index/impl/EsEntityIndexBatchImpl.java | 38 ++--
.../index/impl/IndexBatchBufferImpl.java | 227 ++++++++++---------
.../persistence/index/impl/EntityIndexTest.java | 52 ++++-
.../usergrid/services/guice/ServiceModule.java | 2 -
.../notifications/NotificationsService.java | 4 +-
.../services/notifications/QueueJob.java | 6 +-
.../services/notifications/QueueListener.java | 6 +-
.../impl/ApplicationQueueManagerImpl.java | 2 +-
.../services/queues/ImportQueueListener.java | 7 +-
.../usergrid/services/queues/QueueListener.java | 7 +-
.../resources/usergrid-services-context.xml | 1 -
.../apns/NotificationsServiceIT.java | 2 +-
.../gcm/NotificationsServiceIT.java | 4 +-
21 files changed, 349 insertions(+), 290 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index 2ec0be0..5d57ab7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.usergrid.batch.JobFactory;
import org.apache.usergrid.batch.JobNotFoundException;
import org.apache.usergrid.batch.repository.JobAccessor;
import org.apache.usergrid.batch.repository.JobDescriptor;
-import org.apache.usergrid.metrics.MetricsFactory;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java b/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
deleted file mode 100644
index fccc296..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.metrics;
-import com.codahale.metrics.*;
-import com.codahale.metrics.graphite.Graphite;
-import com.codahale.metrics.graphite.GraphiteReporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import java.net.InetSocketAddress;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Singleton class to manage metrics.
- * TODO:remove this in favor of cp one
- */
-@Component("metricsFactory")
-public class MetricsFactory implements org.apache.usergrid.persistence.core.metrics.MetricsFactory {
- @Autowired
- private Properties properties;
- public MetricRegistry registry;
- private GraphiteReporter graphiteReporter;
- private JmxReporter jmxReporter;
- private ConcurrentHashMap<String,Metric> hashMap;
- private static final Logger LOG = LoggerFactory.getLogger(MetricsFactory.class);
-
- public MetricsFactory() {
-
- }
-
- @PostConstruct
- void init() {
- registry = new MetricRegistry();
- String badHost = "false";
- String metricsHost = properties.getProperty("usergrid.metrics.graphite.host", badHost);
- if(!metricsHost.equals(badHost)) {
- Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
- graphiteReporter = GraphiteReporter.forRegistry(registry)
- .prefixedWith("notifications")
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .filter(MetricFilter.ALL)
- .build(graphite);
- graphiteReporter.start(30, TimeUnit.SECONDS);
- }else {
- LOG.warn("MetricsService:Logger not started.");
- }
- hashMap = new ConcurrentHashMap<String, Metric>();
-
- jmxReporter = JmxReporter.forRegistry(registry).build();
- jmxReporter.start();
- }
-
- public MetricRegistry getRegistry() {
- return registry;
- }
-
- public Timer getTimer(Class<?> klass, String name) {
- return getMetric(Timer.class, klass, name);
- }
-
- public Histogram getHistogram(Class<?> klass, String name) {
- return getMetric(Histogram.class, klass, name);
- }
-
- public Counter getCounter(Class<?> klass, String name) {
- return getMetric(Counter.class, klass, name);
- }
-
- public Meter getMeter(Class<?> klass, String name) {
- return getMetric(Meter.class, klass, name);
- }
-
- private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {
- String key = metricClass.getName() + klass.getName() + name;
- Metric metric = hashMap.get(key);
- if (metric == null) {
- if (metricClass == Histogram.class) {
- metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
- }
- if (metricClass == Timer.class) {
- metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
- }
- if (metricClass == Meter.class) {
- metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
- }
- if (metricClass == Counter.class) {
- metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
- }
- hashMap.put(key, metric);
- }
- return (T) metric;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 60f69af..ece6215 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -184,7 +184,6 @@
<bean id="jobSchedulerBackgroundService" class="org.apache.usergrid.batch.service.JobSchedulerService">
<property name="jobFactory" ref="jobFactory" />
<property name="jobAccessor" ref="schedulerService" />
- <property name="metricsFactory" ref="metricsFactory"/>
<property name="workerSize" value="${usergrid.scheduler.job.workers}" />
<property name="interval" value="${usergrid.scheduler.job.interval}" />
<property name="maxFailCount" value="${usergrid.scheduler.job.maxfail}" />
@@ -196,8 +195,7 @@
</bean>
<bean id="jobFactory" class="org.apache.usergrid.batch.UsergridJobFactory" />
-
- <bean id="metricsFactory" class="org.apache.usergrid.metrics.MetricsFactory" scope="singleton"/>
+
<context:component-scan base-package="org.apache.usergrid.batch.job" />
<context:annotation-config />
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
new file mode 100644
index 0000000..6146fe8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * Future without the exception nastiness
+ */
+public class BetterFuture<T>{
+ FutureTask<T> future;
+ public BetterFuture(Callable<T> callable){
+ future = new FutureTask<>(callable);
+ }
+
+ public void done(){
+ future.run();
+ }
+
+ public T get(){
+ try {
+ return future.get();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
new file mode 100644
index 0000000..d08dd92
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ *
+ */
+public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
+
+ private Subscriber<? super T> subscriber;
+
+ @Override
+ public void call(Subscriber<? super T> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public void done(T t){
+ this.subscriber.onCompleted();
+ }
+
+ public void emit(T t){
+ this.subscriber.onNext(t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 68008bf..f5b8abc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -29,37 +29,41 @@ public interface EntityIndexBatch {
/**
* Create index for Entity
+ *
* @param indexScope The scope for the index
- * @param entity Entity to be indexed.
+ * @param entity Entity to be indexed.
*/
- public EntityIndexBatch index( final IndexScope indexScope, final Entity entity );
+ public EntityIndexBatch index(final IndexScope indexScope, final Entity entity);
/**
* Remove index of entity
- * @param scope The scope for the entity
+ *
+ * @param scope The scope for the entity
* @param entity Entity to be removed from index.
*/
- public EntityIndexBatch deindex(final IndexScope scope, final Entity entity );
+ public EntityIndexBatch deindex(final IndexScope scope, final Entity entity);
/**
* Remove index of entity.
- * @param scope The scope to use for removal
+ *
+ * @param scope The scope to use for removal
* @param result CandidateResult to be removed from index.
*/
- public EntityIndexBatch deindex(final IndexScope scope, final CandidateResult result );
+ public EntityIndexBatch deindex(final IndexScope scope, final CandidateResult result);
/**
* Remove index of entity.
- * @param scope The scope to remove
- * @param id Id to be removed from index.
+ *
+ * @param scope The scope to remove
+ * @param id Id to be removed from index.
* @param version Version to be removed from index.
*/
public EntityIndexBatch deindex(final IndexScope scope, final Id id, final UUID version);
- /**
- * Execute the batch
- */
+ /**
+ * Execute the batch
+ */
public void execute();
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 4a384b4..b24d37b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -16,34 +16,34 @@
*/
package org.apache.usergrid.persistence.index;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
+import rx.Observable;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
/**
- * Buffer for index operations,
+ * * Buffer index requests into sets to send,
*/
public interface IndexBatchBuffer {
/**
- * flush out buffer and execute
- */
- public void flush();
-
- /**
- * flush out buffer and execute
- */
- public void flushAndRefresh();
-
- /**
- * put request into buffer
+ * put request into buffer, retu
+ *
* @param builder
*/
- public void put(IndexRequestBuilder builder);
+ public BetterFuture put(IndexRequestBuilder builder);
/**
* put request into buffer
+ *
* @param builder
*/
- public void put(DeleteRequestBuilder builder);
+ public BetterFuture put(DeleteRequestBuilder builder);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 36ca588..9bdac36 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -53,13 +53,17 @@ public interface IndexFig extends GuicyFig {
public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_size";
+ public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
+
+ public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
+
/**
* the number of times we can fail before we refresh the client
*/
public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
-
+
@Default( "127.0.0.1" )
@Key( ELASTICSEARCH_HOSTS )
String getHosts();
@@ -68,10 +72,10 @@ public interface IndexFig extends GuicyFig {
@Key( ELASTICSEARCH_PORT )
int getPort();
- @Default( "usergrid" )
+ @Default( "usergrid" )
@Key( ELASTICSEARCH_CLUSTER_NAME)
String getClusterName();
-
+
@Default( "usergrid" ) // no underbars allowed
@Key( ELASTICSEARCH_INDEX_PREFIX )
String getIndexPrefix();
@@ -79,7 +83,7 @@ public interface IndexFig extends GuicyFig {
@Default( "alias" ) // no underbars allowed
@Key( ELASTICSEARCH_ALIAS_POSTFIX )
String getAliasPostfix();
-
+
@Default( "1" ) // TODO: does this timeout get extended on each query?
@Key( QUERY_CURSOR_TIMEOUT_MINUTES )
int getQueryCursorTimeout();
@@ -93,7 +97,7 @@ public interface IndexFig extends GuicyFig {
@Key( QUERY_LIMIT_DEFAULT )
int getQueryLimitDefault();
- @Default( "false" )
+ @Default( "false" )
@Key( ELASTICSEARCH_FORCE_REFRESH )
public boolean isForcedRefresh();
@@ -117,11 +121,19 @@ public interface IndexFig extends GuicyFig {
@Default("2")
int getIndexCacheMaxWorkers();
- @Default("1000")
+ @Default("250")
@Key( INDEX_BUFFER_TIMEOUT )
int getIndexBufferTimeout();
@Default("100")
@Key( INDEX_BUFFER_SIZE )
int getIndexBufferSize();
+
+ @Default("300")
+ @Key( INDEX_BATCH_SIZE)
+ int getIndexBatchSize();
+
+ @Default("one")
+ @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
+ String getWriteConsistencyLevel();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 1bc21d3..896c038 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -18,15 +18,12 @@
*/
package org.apache.usergrid.persistence.index.impl;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.util.concurrent.Futures;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.*;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -93,6 +90,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final FailureMonitor failureMonitor;
private final AliasedEntityIndex entityIndex;
+ private final ConcurrentLinkedQueue<BetterFuture> promises;
public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -106,13 +104,12 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
+ this.promises = new ConcurrentLinkedQueue<>();
}
@Override
public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
-
-
IndexValidationUtils.validateIndexScope( indexScope );
ValidationUtils.verifyEntityWrite( entity );
@@ -141,7 +138,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
final String entityType = entity.getId().getType();
IndexRequestBuilder builder =
client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
- indexBatchBuffer.put(builder);
+ promises.add(indexBatchBuffer.put(builder));
return this;
}
@@ -187,7 +184,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public Object call(String index) {
try {
DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
- indexBatchBuffer.put(builder);
+ promises.add(indexBatchBuffer.put(builder));
}catch (Exception e){
log.error("failed to deindex",e);
throw e;
@@ -220,14 +217,29 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public void execute() {
- indexBatchBuffer.flush();
+// indexBatchBuffer.flush();
+ Observable.from(promises)
+ .doOnNext(new Action1<BetterFuture>() {
+ @Override
+ public void call(BetterFuture betterFuture) {
+ betterFuture.get();
+ }
+ }).toBlocking().lastOrDefault(null);
+ promises.clear();
}
@Override
public void executeAndRefresh() {
- indexBatchBuffer.flushAndRefresh();
+// indexBatchBuffer.flushAndRefresh();
+ Iterator<BetterFuture> iterator = promises.iterator();
+ while(iterator.hasNext()){
+ iterator.next().get();
+ }
+ promises.clear();
+ entityIndex.refresh();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 08ed17b..d7e6bf1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -20,9 +20,11 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBatchBuffer;
import org.apache.usergrid.persistence.index.IndexFig;
+import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@@ -34,146 +36,179 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
-import rx.Subscription;
+import rx.functions.Action0;
import rx.functions.Action1;
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
- * Classy class class.
+ * Buffer index requests into sets to send.
*/
@Singleton
public class IndexBatchBufferImpl implements IndexBatchBuffer {
private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
+ private final MetricsFactory metricsFactory;
+ private final Counter indexSizeCounter;
private final Client client;
- private final FailureMonitor failureMonitor;
+ private final FailureMonitorImpl failureMonitor;
private final IndexFig config;
- private final boolean refresh;
- private final int timeout;
- private final int bufferSize;
- private final MetricsFactory metricsFactory;
private final Timer flushTimer;
- private final Counter indexSizeCounter;
+ private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
+ private Observable<List<RequestBuilderContainer>> consumer;
private Producer producer;
- private Subscription producerObservable;
- private ArrayBlockingQueue blockingQueue;
@Inject
- public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
+ public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
this.metricsFactory = metricsFactory;
- this.client = provider.getClient();
- this.failureMonitor = new FailureMonitorImpl( config, provider );
- this.config = config;
- this.producer = new Producer();
- this.refresh = config.isForcedRefresh();
- this.timeout = config.getIndexBufferTimeout();
- this.bufferSize = config.getIndexBufferSize();
this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
this.indexSizeCounter = metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
- blockingQueue = new ArrayBlockingQueue(500);
- init();
- }
-
+ this.config = config;
+ this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
+ this.failureMonitor = new FailureMonitorImpl(config,provider);
+ this.producer = new Producer();
+ this.client = provider.getClient();
+ consumer();
+ }
- private void init() {
- this.producerObservable = Observable.create(producer)
- .doOnNext(new Action1<RequestBuilderContainer>() {
- @Override
- public void call(RequestBuilderContainer container) {
- try {
- blockingQueue.offer(container, 2500, TimeUnit.MILLISECONDS);
- }catch (InterruptedException ie){
- throw new RuntimeException(ie);
- }
- }
- })
- .buffer(timeout, TimeUnit.MILLISECONDS, bufferSize)
+ private void consumer() {
+ //batch up sets of some size and send them in batch
+ this.consumer = Observable.create(producer)
+ .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
.doOnNext(new Action1<List<RequestBuilderContainer>>() {
@Override
- public void call(List<RequestBuilderContainer> builderContainerList) {
+ public void call(List<RequestBuilderContainer> containerList) {
+ for (RequestBuilderContainer container : containerList) {
+ blockingQueue.add(container);
+ }
flushTimer.time();
- indexSizeCounter.dec(builderContainerList.size());
- execute();
+ indexSizeCounter.dec(containerList.size());
+ execute(config.isForcedRefresh());
}
- })
- .subscribe();
+ });
+ consumer.subscribe();
}
- public void put(IndexRequestBuilder builder){
+ @Override
+ public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
+ RequestBuilderContainer container = new RequestBuilderContainer(builder);
metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
- producer.put(new RequestBuilderContainer(builder));
+ producer.put(container);
+ return container.getFuture();
}
- public void put(DeleteRequestBuilder builder){
+ @Override
+ public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
+ RequestBuilderContainer container = new RequestBuilderContainer(builder);
metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
- producer.put(new RequestBuilderContainer(builder));
+ producer.put(container);
+ return container.getFuture();
}
- public void flushAndRefresh(){
- execute(true);
- }
- public void flush(){
- execute();
- }
- private void execute(){
- execute(this.refresh);
- }
+ private static class RequestBuilderContainer{
+ private final ShardReplicationOperationRequestBuilder builder;
+ private final BetterFuture<ShardReplicationOperationRequestBuilder> containerFuture;
+
+ public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
+ final RequestBuilderContainer parent = this;
+ this.builder = builder;
+ this.containerFuture = new BetterFuture<>(new Callable<ShardReplicationOperationRequestBuilder>() {
+ @Override
+ public ShardReplicationOperationRequestBuilder call() throws Exception {
+ return parent.getBuilder();
+ }
+ });
+ }
+
+ public ShardReplicationOperationRequestBuilder getBuilder(){
+ return builder;
+ }
+ private void done(){
+ containerFuture.done();
+ }
+ public BetterFuture<ShardReplicationOperationRequestBuilder> getFuture(){
+ return containerFuture;
+ }
+ }
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
- private synchronized void execute(boolean refresh ) {
- try {
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- bulkRequest.setRefresh(refresh);
- int count = bufferSize;
- while (blockingQueue.size() > 0 && count-- > 0) {
- RequestBuilderContainer container = (RequestBuilderContainer)blockingQueue.take();
- ShardReplicationOperationRequestBuilder builder = container.getBuilder();
- if (builder instanceof IndexRequestBuilder) {
- bulkRequest.add((IndexRequestBuilder) builder);
- }
- if (builder instanceof DeleteRequestBuilder) {
- bulkRequest.add((DeleteRequestBuilder) builder);
- }
+ private void execute( boolean refresh) {
+
+ if (blockingQueue.size() == 0) {
+ return;
+ }
+
+ BulkRequestBuilder bulkRequest = initRequest(refresh);
+
+ Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
+ blockingQueue.drainTo(containerCollection);
+ int count = 0;
+ //clear the queue or proceed to buffersize
+ for (RequestBuilderContainer container : containerCollection) {
+
+ ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+ //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
+ if (builder instanceof IndexRequestBuilder) {
+ bulkRequest.add((IndexRequestBuilder) builder);
}
- //nothing to do, we haven't added anthing to the index
- if (bulkRequest.numberOfActions() == 0) {
- return;
+ if (builder instanceof DeleteRequestBuilder) {
+ bulkRequest.add((DeleteRequestBuilder) builder);
}
- final BulkResponse responses;
+ if (count++ == config.getIndexBatchSize()) {
+ sendRequest(bulkRequest);
+ bulkRequest = initRequest(refresh);
- try {
- responses = bulkRequest.execute().actionGet();
- } catch (Throwable t) {
- log.error("Unable to communicate with elasticsearch");
- failureMonitor.fail("Unable to execute batch", t);
- throw t;
}
+ }
+ sendRequest(bulkRequest);
+ for (RequestBuilderContainer container : containerCollection) {
+ container.done();
+ }
+ }
- failureMonitor.success();
+ private BulkRequestBuilder initRequest(boolean refresh) {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+ bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+ bulkRequest.setRefresh(refresh);
+ return bulkRequest;
+ }
- for (BulkItemResponse response : responses) {
- if (response.isFailed()) {
- throw new RuntimeException("Unable to index documents. Errors are :"
- + response.getFailure().getMessage());
- }
+ private void sendRequest(BulkRequestBuilder bulkRequest) {
+ //nothing to do, we haven't added anthing to the index
+ if (bulkRequest.numberOfActions() == 0) {
+ return;
+ }
+
+ final BulkResponse responses;
+
+ try {
+ responses = bulkRequest.execute().actionGet();
+ } catch (Throwable t) {
+ log.error("Unable to communicate with elasticsearch");
+ failureMonitor.fail("Unable to execute batch", t);
+ throw t;
+ }
+
+ failureMonitor.success();
+
+ for (BulkItemResponse response : responses) {
+ if (response.isFailed()) {
+ throw new RuntimeException("Unable to index documents. Errors are :"
+ + response.getFailure().getMessage());
}
- }catch (InterruptedException ie){
- log.error("Problem taking messages off of queue",ie);
- throw new RuntimeException(ie);
}
}
+
private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
private Subscriber<? super RequestBuilderContainer> subscriber;
@@ -188,16 +223,4 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
}
- private static class RequestBuilderContainer{
- private final ShardReplicationOperationRequestBuilder builder;
-
- public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
- this.builder = builder;
- }
-
- public ShardReplicationOperationRequestBuilder getBuilder(){
- return builder;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 736eb9b..6cda9f2 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.persistence.index.impl;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -53,9 +55,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.*;
@RunWith(EsRunner.class)
@@ -64,12 +64,9 @@ public class EntityIndexTest extends BaseIT {
private static final Logger log = LoggerFactory.getLogger( EntityIndexTest.class );
-
@Inject
public EntityIndexFactory eif;
-
-
@Test
public void testIndex() throws IOException {
Id appId = new SimpleId( "application" );
@@ -91,6 +88,49 @@ public class EntityIndexTest extends BaseIT {
}
@Test
+ public void testIndexThreads() throws IOException {
+ final Id appId = new SimpleId( "application" );
+
+ final ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
+ long now = System.currentTimeMillis();
+ final int threads = 1000;
+ final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
+ final IndexScope indexScope = new IndexScopeImpl(appId, "things");
+ final String entityType = "thing";
+ entityIndex.initializeIndex();
+ final CountDownLatch latch = new CountDownLatch(threads);
+ final AtomicLong failTime=new AtomicLong(0);
+ for(int i=0;i<threads;i++) {
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ insertJsonBlob(entityIndex, entityType, indexScope, "/sample-small.json", 1, 0);
+ entityIndex.refresh();
+ } catch (Exception e) {
+ synchronized (failTime) {
+ if (failTime.get() == 0) {
+ failTime.set(System.currentTimeMillis());
+ }
+ }
+ System.out.println(e.toString());
+ fail("threw exception");
+ }finally {
+ latch.countDown();
+ }
+ }
+ });
+ thread.start();
+ }
+ try {
+ latch.await();
+ }catch (InterruptedException ie){
+ throw new RuntimeException(ie);
+ }
+ assertTrue("system must have failed at "+(failTime.get() - now) ,failTime.get()==0);
+ }
+
+ @Test
public void testMultipleIndexInitializations(){
Id appId = new SimpleId( "application" );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java b/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
index 9ea6142..b61b18c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.services.guice;
import org.apache.usergrid.corepersistence.CoreModule;
-import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
@@ -59,7 +58,6 @@ public class ServiceModule extends AbstractModule {
//Seems weird, aren't we just binding the factory to the exact same factory when it goes to look for it?
bind( ServiceManagerFactory.class );
bind( EntityManagerFactory.class );
- bind( MetricsFactory.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 2d2ac1d..5eed002 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -23,9 +23,9 @@ import com.codahale.metrics.Timer;
import com.google.inject.Injector;
import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.mq.Message;
import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
@@ -86,7 +86,7 @@ public class NotificationsService extends AbstractCollectionService {
emf = getApplicationContext().getBean(EntityManagerFactory.class);
Properties props = (Properties)getApplicationContext().getBean("properties");
- metricsService = getApplicationContext().getBean(MetricsFactory.class);
+ metricsService = getApplicationContext().getBean(Injector.class).getInstance(MetricsFactory.class);
postMeter = metricsService.getMeter(NotificationsService.class, "requests");
postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
JobScheduler jobScheduler = new JobScheduler(sm,em);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
index 86c7b44..6fc7b7a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
@@ -21,7 +21,10 @@ import java.util.UUID;
import javax.annotation.PostConstruct;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,7 +32,6 @@ import org.springframework.stereotype.Component;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.batch.job.OnlyOnceJob;
-import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.entities.JobData;
@@ -46,7 +48,6 @@ public class QueueJob extends OnlyOnceJob {
private static final Logger logger = LoggerFactory.getLogger( QueueJob.class );
- @Autowired
private MetricsFactory metricsService;
@Autowired
@@ -68,6 +69,7 @@ public class QueueJob extends OnlyOnceJob {
@PostConstruct
void init() {
+ metricsService = this.smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
histogram = metricsService.getHistogram( QueueJob.class, "cycle" );
requests = metricsService.getMeter( QueueJob.class, "requests" );
execution = metricsService.getTimer( QueueJob.class, "execution" );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 94ebf65..5247a25 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -22,11 +22,11 @@ import com.google.common.cache.*;
import com.google.inject.Injector;
import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.services.ServiceManager;
@@ -76,11 +76,11 @@ public class QueueListener {
public QueueManager TEST_QUEUE_MANAGER;
private int consecutiveCallsToRemoveDevices;
- public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+ public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){
this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
this.smf = smf;
this.emf = emf;
- this.metricsService = metricsService;
+ this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
this.properties = props;
this.queueScopeFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueScopeFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 8cc3450..5b1a6b3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -20,8 +20,8 @@ import com.clearspring.analytics.hash.MurmurHash;
import com.clearspring.analytics.stream.frequency.CountMinSketch;
import com.codahale.metrics.Meter;
import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index 61805c7..a4b0785 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@ -20,13 +20,14 @@ package org.apache.usergrid.services.queues;
import java.util.List;
import java.util.Properties;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.usergrid.management.importer.ImportService;
import org.apache.usergrid.management.importer.ImportServiceImpl;
-import org.apache.usergrid.metrics.MetricsFactory;
+
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.services.ServiceManagerFactory;
@@ -53,8 +54,8 @@ public class ImportQueueListener extends QueueListener {
@Inject
public ImportQueueListener( final ServiceManagerFactory smf, final EntityManagerFactory emf,
- final MetricsFactory metricsService, final Injector injector, final Properties props ) {
- super( smf, emf, metricsService, injector, props );
+ final Injector injector, final Properties props ) {
+ super( smf, emf, injector, props );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 744a8db..8277e86 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -22,11 +22,11 @@ import com.google.common.cache.*;
import com.google.inject.Injector;
import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.services.ServiceManager;
@@ -84,15 +84,14 @@ public abstract class QueueListener {
* Initializes the QueueListener.
* @param smf
* @param emf
- * @param metricsService
* @param props
*/
- public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Injector injector, Properties props){
+ public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Injector injector, Properties props){
//TODO: change current injectors to use service module instead of CpSetup
this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class );
this.smf = smf;
this.emf = injector.getInstance( EntityManagerFactory.class ); //emf;
- this.metricsService = metricsService;
+ this.metricsService = injector.getInstance(MetricsFactory.class);
this.properties = props;
this.queueScopeFactory = injector.getInstance(QueueScopeFactory.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/resources/usergrid-services-context.xml
----------------------------------------------------------------------
diff --git a/stack/services/src/main/resources/usergrid-services-context.xml b/stack/services/src/main/resources/usergrid-services-context.xml
index 1c2eb63..e97481b 100644
--- a/stack/services/src/main/resources/usergrid-services-context.xml
+++ b/stack/services/src/main/resources/usergrid-services-context.xml
@@ -92,7 +92,6 @@
<bean id="notificationsQueueListener" class="org.apache.usergrid.services.notifications.QueueListener"
scope="singleton">
<constructor-arg name="emf" ref="entityManagerFactory" />
- <constructor-arg name="metricsService" ref="metricsFactory" />
<constructor-arg name="props" ref="properties" />
<constructor-arg name="smf" ref="serviceManagerFactory" />
</bean>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 5fda74b..95418ee 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -124,7 +124,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
app.getEntityManager().refreshIndex();
- listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
+ listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(), new Properties());
listener.TEST_QUEUE_MANAGER = qm;
listener.DEFAULT_SLEEP = 200;
listener.start();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8d6fb70..ee00c88 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -101,7 +101,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
DefaultQueueManager qm = new DefaultQueueManager();
ns.TEST_QUEUE_MANAGER = qm;
- listener = new QueueListener(ns.getServiceManagerFactory(), ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
+ listener = new QueueListener(ns.getServiceManagerFactory(), ns.getEntityManagerFactory(), new Properties());
listener.DEFAULT_SLEEP = 200;
listener.TEST_QUEUE_MANAGER = qm;
listener.start();
@@ -186,7 +186,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
Entity device = app.testRequest(ServiceAction.POST, 1, "users", user.getUuid(), "devices", device1.getUuid()).getEntity();
assertEquals(device.getUuid(), device1.getUuid());
- // create and post notification
+ // create and post notification
String payload = "Hello, World!";
Map<String, String> payloads = new HashMap<String, String>(1);
payloads.put(notifier.getUuid().toString(), payload);