You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/30 21:32:32 UTC
[08/50] [abbrv] usergrid git commit: merge from 1048
merge from 1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5eed63d4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5eed63d4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5eed63d4
Branch: refs/heads/USERGRID-872
Commit: 5eed63d43999c8f381e2fbcbe7d2c1fdb6b73729
Parents: 68befd2 4013f17
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 19 15:19:33 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 19 15:19:33 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 377 ++++++++----
.../asyncevents/AsyncEventService.java | 1 +
.../asyncevents/AsyncIndexProvider.java | 14 +-
.../asyncevents/model/AsyncEvent.java | 8 +-
.../model/ElasticsearchIndexEvent.java | 50 ++
.../index/IndexProcessorFig.java | 17 +
.../util/ObjectJsonSerializer.java | 92 +++
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../index/AsyncIndexServiceTest.java | 2 +-
.../usergrid/persistence/map/MapManager.java | 25 +-
.../persistence/map/impl/MapManagerImpl.java | 6 +
.../persistence/map/impl/MapSerialization.java | 27 +-
.../map/impl/MapSerializationImpl.java | 265 ++++----
.../index/impl/DeIndexOperation.java | 4 +
.../persistence/index/impl/IndexOperation.java | 4 +
.../index/impl/IndexOperationMessage.java | 5 +
.../persistence/queue/LocalQueueManager.java | 12 +-
.../persistence/queue/QueueManager.java | 8 +-
.../persistence/queue/guice/QueueModule.java | 1 -
.../queue/impl/SNSQueueManagerImpl.java | 615 +++++++++++--------
.../queue/impl/SQSQueueManagerImpl.java | 352 -----------
.../services/queues/ImportQueueManager.java | 9 +-
22 files changed, 1017 insertions(+), 883 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1ad9b86,2bace8d..aafb63a
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@@ -27,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.queue.LocalQueueManager;
+ import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
@@@ -99,16 -103,12 +104,15 @@@ public class AsyncIndexProvider impleme
switch (impl) {
case LOCAL:
- return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
+ AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig);
++ entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
+ eventService.MAX_TAKE = 1000;
+ return eventService;
case SQS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig );
+ throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig);
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 8a8c8a5,ec9b315..69d5e18
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@@ -86,7 -86,32 +86,24 @@@ public interface IndexProcessorFig exte
String getQueueImplementation();
@Default("1000")
- @Key("elasticsearch.reindex.flush.interval")
- int getUpdateInterval();
-
- @Default("100")
- @Key("elasticsearch.buffer.time_ms")
- int getBufferTime();
-
- @Default("1000")
- @Key( REINDEX_BUFFER_SIZE )
+ @Key(REINDEX_BUFFER_SIZE)
int getReindexBufferSize();
+ /**
+ * Flag to resolve the LOCAL queue implementation service synchronously.
+ */
+ @Default("false")
+ @Key("elasticsearch.queue_impl.resolution")
+ boolean resolveSynchronously();
+
+ /**
+ * Get the message TTL in milliseconds. Defaults to 24 hours
+ *
+ * 24 * 60 * 60 * 1000
+ *
+ * @return
+ */
+ @Default("86400000")
+ @Key( "elasticsearch.message.ttl" )
+ int getIndexMessageTtl();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 4b4218a,0000000..8be6099
mode 100644,000000..100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@@ -1,98 -1,0 +1,108 @@@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.queue;
+
+import rx.Observable;
+
+import java.io.IOException;
+import java.util.AbstractQueue;
++import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default queue manager implementation, uses in memory linked queue
+ */
+public class LocalQueueManager implements QueueManager {
+ public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
+
+ @Override
+ public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+ List<QueueMessage> returnQueue = new ArrayList<>();
+ try {
+ QueueMessage message=null;
+ int count = 5;
+ do {
+ message = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (message != null) {
+ returnQueue.add(message);
+ }
+ }while(message!=null && count-->0);
+ }catch (InterruptedException ie){
+ throw new RuntimeException(ie);
+ }
+ return Observable.from( returnQueue);
+ }
+
+ @Override
+ public long getQueueDepth() {
+ return queue.size();
+ }
+
+ @Override
+ public void commitMessage(QueueMessage queueMessage) {
+ }
+
+ @Override
+ public void commitMessages(List<QueueMessage> queueMessages) {
+ }
+
+ @Override
+ public void sendMessages(List bodies) throws IOException {
+ for(Object body : bodies){
+ String uuid = UUID.randomUUID().toString();
+ try {
+ queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+ }catch (InterruptedException ie){
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+
++
+ @Override
- public void sendMessage(Object body) throws IOException {
++ public <T extends Serializable> void sendMessage( final T body ) throws IOException {
+ String uuid = UUID.randomUUID().toString();
+ try {
+ queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+ }catch (InterruptedException ie){
+ throw new RuntimeException(ie);
+ }
+ }
+
++
++
++ @Override
++ public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
++ sendMessage( body );
++ }
++
++
+ @Override
+ public void deleteQueue() {
+
+ }
+}