You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/30 21:32:32 UTC

[08/50] [abbrv] usergrid git commit: merge from 1048

merge from 1048


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5eed63d4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5eed63d4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5eed63d4

Branch: refs/heads/USERGRID-872
Commit: 5eed63d43999c8f381e2fbcbe7d2c1fdb6b73729
Parents: 68befd2 4013f17
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 19 15:19:33 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 19 15:19:33 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 377 ++++++++----
 .../asyncevents/AsyncEventService.java          |   1 +
 .../asyncevents/AsyncIndexProvider.java         |  14 +-
 .../asyncevents/model/AsyncEvent.java           |   8 +-
 .../model/ElasticsearchIndexEvent.java          |  50 ++
 .../index/IndexProcessorFig.java                |  17 +
 .../util/ObjectJsonSerializer.java              |  92 +++
 .../index/AmazonAsyncEventServiceTest.java      |   6 +-
 .../index/AsyncIndexServiceTest.java            |   2 +-
 .../usergrid/persistence/map/MapManager.java    |  25 +-
 .../persistence/map/impl/MapManagerImpl.java    |   6 +
 .../persistence/map/impl/MapSerialization.java  |  27 +-
 .../map/impl/MapSerializationImpl.java          | 265 ++++----
 .../index/impl/DeIndexOperation.java            |   4 +
 .../persistence/index/impl/IndexOperation.java  |   4 +
 .../index/impl/IndexOperationMessage.java       |   5 +
 .../persistence/queue/LocalQueueManager.java    |  12 +-
 .../persistence/queue/QueueManager.java         |   8 +-
 .../persistence/queue/guice/QueueModule.java    |   1 -
 .../queue/impl/SNSQueueManagerImpl.java         | 615 +++++++++++--------
 .../queue/impl/SQSQueueManagerImpl.java         | 352 -----------
 .../services/queues/ImportQueueManager.java     |   9 +-
 22 files changed, 1017 insertions(+), 883 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1ad9b86,2bace8d..aafb63a
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@@ -27,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
  import org.apache.usergrid.persistence.index.EntityIndexFactory;
  import org.apache.usergrid.persistence.index.impl.IndexProducer;
 +import org.apache.usergrid.persistence.queue.LocalQueueManager;
+ import org.apache.usergrid.persistence.map.MapManagerFactory;
  import org.apache.usergrid.persistence.queue.QueueFig;
  import org.apache.usergrid.persistence.queue.QueueManagerFactory;
  
@@@ -99,16 -103,12 +104,15 @@@ public class AsyncIndexProvider impleme
  
          switch (impl) {
              case LOCAL:
 -                return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
 +                AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig);
++                    entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
 +                eventService.MAX_TAKE = 1000;
 +                return eventService;
              case SQS:
-                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig );
+                 throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
              case SNS:
                  return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig);
+                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
              default:
                  throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
          }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 8a8c8a5,ec9b315..69d5e18
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@@ -86,7 -86,32 +86,24 @@@ public interface IndexProcessorFig exte
      String getQueueImplementation();
  
      @Default("1000")
 -    @Key("elasticsearch.reindex.flush.interval")
 -    int getUpdateInterval();
 -
 -    @Default("100")
 -    @Key("elasticsearch.buffer.time_ms")
 -    int getBufferTime();
 -
 -    @Default("1000")
 -    @Key( REINDEX_BUFFER_SIZE )
 +    @Key(REINDEX_BUFFER_SIZE)
      int getReindexBufferSize();
  
+     /**
+      * Flag to resolve the LOCAL queue implementation service synchronously.
+      */
+     @Default("false")
+     @Key("elasticsearch.queue_impl.resolution")
+     boolean resolveSynchronously();
+ 
+     /**
+      * Get the message TTL in milliseconds.  Defaults to 24 hours
+      *
+      * 24 * 60 * 60 * 1000
+      *
+      * @return
+      */
+     @Default("86400000")
+     @Key( "elasticsearch.message.ttl" )
+     int getIndexMessageTtl();
  }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5eed63d4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 4b4218a,0000000..8be6099
mode 100644,000000..100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@@ -1,98 -1,0 +1,108 @@@
 +/*
 + *
 + *  * Licensed to the Apache Software Foundation (ASF) under one or more
 + *  *  contributor license agreements.  The ASF licenses this file to You
 + *  * under the Apache License, Version 2.0 (the "License"); you may not
 + *  * use this file except in compliance with the License.
 + *  * You may obtain a copy of the License at
 + *  *
 + *  *     http://www.apache.org/licenses/LICENSE-2.0
 + *  *
 + *  * Unless required by applicable law or agreed to in writing, software
 + *  * distributed under the License is distributed on an "AS IS" BASIS,
 + *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  * See the License for the specific language governing permissions and
 + *  * limitations under the License.  For additional information regarding
 + *  * copyright in this work, please see the NOTICE file in the top level
 + *  * directory of this distribution.
 + *
 + */
 +
 +package org.apache.usergrid.persistence.queue;
 +
 +import rx.Observable;
 +
 +import java.io.IOException;
 +import java.util.AbstractQueue;
++import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.UUID;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.PriorityBlockingQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +/**
 + * Default queue manager implementation, uses in memory linked queue
 + */
 +public class LocalQueueManager implements QueueManager {
 +    public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
 +
 +    @Override
 +    public    Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
 +        List<QueueMessage> returnQueue = new ArrayList<>();
 +        try {
 +            QueueMessage message=null;
 +            int count = 5;
 +            do {
 +                message = queue.poll(100, TimeUnit.MILLISECONDS);
 +                if (message != null) {
 +                    returnQueue.add(message);
 +                }
 +            }while(message!=null && count-->0);
 +        }catch (InterruptedException ie){
 +            throw new RuntimeException(ie);
 +        }
 +        return Observable.from( returnQueue);
 +    }
 +
 +    @Override
 +    public long getQueueDepth() {
 +        return queue.size();
 +    }
 +
 +    @Override
 +    public void commitMessage(QueueMessage queueMessage) {
 +    }
 +
 +    @Override
 +    public void commitMessages(List<QueueMessage> queueMessages) {
 +    }
 +
 +    @Override
 +    public  void sendMessages(List bodies) throws IOException {
 +        for(Object body : bodies){
 +            String uuid = UUID.randomUUID().toString();
 +            try {
 +                queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
 +            }catch (InterruptedException ie){
 +                throw new RuntimeException(ie);
 +            }
 +        }
 +    }
 +
++
 +    @Override
-     public  void sendMessage(Object body) throws IOException {
++    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
 +        String uuid = UUID.randomUUID().toString();
 +        try {
 +            queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
 +        }catch (InterruptedException ie){
 +            throw new RuntimeException(ie);
 +        }
 +    }
 +
++
++
++    @Override
++    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
++       sendMessage( body );
++    }
++
++
 +    @Override
 +    public void deleteQueue() {
 +
 +    }
 +}