You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:54 UTC
[25/25] usergrid git commit: Changes to get Qakka using same injector
as rest of Usergrid
Changes to get Qakka using same injector as rest of Usergrid
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/18e4305b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/18e4305b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/18e4305b
Branch: refs/heads/usergrid-1318-queue
Commit: 18e4305b995be88947b72172dd22056702659a8e
Parents: 832b505
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 16 18:33:10 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 16 18:33:10 2016 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 3 ++
.../usergrid/persistence/core/CassandraFig.java | 2 +-
.../index/impl/EsIndexProducerImpl.java | 2 --
.../apache/usergrid/persistence/qakka/App.java | 12 +++----
.../usergrid/persistence/qakka/QakkaModule.java | 6 ++--
.../impl/DistributedQueueServiceImpl.java | 35 ++++++++++++++------
6 files changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5d8c417..4bec92d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -60,6 +60,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.qakka.App;
import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
@@ -151,6 +152,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
try {
logger.info("Akka cluster starting...");
+ // TODO: fix this kludge
+ injector.getInstance( App.class );
this.actorSystemManager = injector.getInstance( ActorSystemManager.class );
actorSystemManager.registerRouterProducer( injector.getInstance( UniqueValuesService.class ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index b599a20..bc8d087 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -91,7 +91,7 @@ public interface CassandraFig extends GuicyFig {
@Default( "Usergrid_Applications" )
String getApplicationKeyspace();
- @Key( "cassandra.keyspace.application_local" )
+ @Key( "cassandra.keyspace.application.local" )
@Default( "Usergrid_Applications_Local" )
String getApplicationLocalKeyspace();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 10d5e4a..8f58ef7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -57,7 +57,6 @@ public class EsIndexProducerImpl implements IndexProducer {
private final IndexFig config;
private final FailureMonitorImpl failureMonitor;
private final Client client;
- private final Timer flushTimer;
private final IndexFig indexFig;
private final Counter indexSizeCounter;
private final Histogram roundtripTimer;
@@ -70,7 +69,6 @@ public class EsIndexProducerImpl implements IndexProducer {
@Inject
public EsIndexProducerImpl(final IndexFig config, final EsProvider provider,
final MetricsFactory metricsFactory, final IndexFig indexFig) {
- this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, "index_buffer.flush");
this.indexSizeCounter = metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size");
this.roundtripTimer = metricsFactory.getHistogram(EsIndexProducerImpl.class, "index_buffer.message_cycle");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index 9d9c972..41bc6fa 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -59,12 +59,12 @@ public class App implements MetricsService {
this.actorSystemFig = actorSystemFig;
this.actorSystemManager = actorSystemManager;
this.distributedQueueService = distributedQueueService;
-
- try {
- migrationManager.migrate();
- } catch (MigrationException e) {
- throw new QakkaRuntimeException( "Error running migration", e );
- }
+//
+// try {
+// migrationManager.migrate();
+// } catch (MigrationException e) {
+// throw new QakkaRuntimeException( "Error running migration", e );
+// }
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
index 0c37e82..d1d8d7e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -64,8 +64,8 @@ public class QakkaModule extends AbstractModule {
// TODO: reconcile with usergrid props
// load properties from one properties file using Netflix Archaius so that GuicyFig will see them
ConfigurationManager.loadCascadedPropertiesFromResources( "qakka" );
- } catch (IOException e) {
- logger.warn("Unable to load qakka.properties");
+ } catch (Throwable t) {
+ logger.warn("Unable to load qakka.properties (can be ignored in Usergrid)");
}
}
@@ -105,11 +105,11 @@ public class QakkaModule extends AbstractModule {
Multibinder<Migration> migrationBinder = Multibinder.newSetBinder( binder(), Migration.class );
migrationBinder.addBinding().to( Key.get( AuditLogSerialization.class ) );
- //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
migrationBinder.addBinding().to( Key.get( QueueMessageSerialization.class ) );
migrationBinder.addBinding().to( Key.get( QueueSerialization.class ) );
migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) );
migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) );
+ //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 1243c23..ec667e6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.ClientActor;
import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QueueManager;
import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -185,23 +186,37 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
// ask ClientActor and wait (up to timeout) for response
Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), request, t );
- final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
+ Object responseObject = Await.result( fut, t.duration() );
+
+ if ( responseObject instanceof QakkaMessage ) {
- if ( response != null && response instanceof QueueGetResponse) {
- QueueGetResponse qprm = (QueueGetResponse)response;
- if ( qprm.isSuccess() ) {
- if (retries > 1) {
- logger.debug( "getNextMessage SUCCESS after {} retries", retries );
+ final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
+
+ if ( response != null && response instanceof QueueGetResponse) {
+ QueueGetResponse qprm = (QueueGetResponse)response;
+ if ( qprm.isSuccess() ) {
+ if (retries > 1) {
+ logger.debug( "getNextMessage SUCCESS after {} retries", retries );
+ }
}
+ return qprm.getQueueMessages();
+
+
+ } else if ( response != null ) {
+ logger.debug("ERROR RESPONSE (1) popping queue, retrying {}", retries );
+
+ } else {
+ logger.debug("TIMEOUT popping to queue, retrying {}", retries );
}
- return qprm.getQueueMessages();
+ } else if ( responseObject instanceof ClientActor.ErrorResponse ) {
- } else if ( response != null ) {
- logger.debug("ERROR RESPONSE popping queue, retrying {}", retries );
+ final ClientActor.ErrorResponse errorResponse = (ClientActor.ErrorResponse)responseObject;
+ logger.debug("ACTORSYSTEM ERROR popping queue: {}, retrying {}",
+ errorResponse.getMessage(), retries );
} else {
- logger.debug("TIMEOUT popping to queue, retrying {}", retries );
+ logger.debug("UNKNOWN RESPONSE popping queue, retrying {}", retries );
}
} catch ( Exception e ) {