You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/20 21:17:39 UTC
[01/11] usergrid git commit: Updates the defaults to be more sensible
in a multi-region environment
Repository: usergrid
Updated Branches:
refs/heads/remove-inmemory-event-service 5eed63d43 -> fad8ecdb0
Updates the defaults to be more sensible in a multi-region environment
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ec0f588
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ec0f588
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ec0f588
Branch: refs/heads/remove-inmemory-event-service
Commit: 3ec0f5886b82737d4a7ed64fae01afbdb6707763
Parents: 4013f17
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 17:44:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 17:44:39 2015 -0600
----------------------------------------------------------------------
.../corepersistence/index/PublishRxTest.java | 95 ----------------
.../usergrid/corepersistence/index/RxTest.java | 108 +++++++++++++++++++
.../persistence/core/astyanax/CassandraFig.java | 6 +-
3 files changed, 111 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
deleted file mode 100644
index 973a42d..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import rx.Observable;
-import rx.Subscription;
-import rx.observables.ConnectableObservable;
-import rx.schedulers.Schedulers;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test to test some assumptions about RX behaviors
- */
-public class PublishRxTest {
-
- @Test
- public void testPublish() throws InterruptedException {
-
- final int count = 10;
-
- final CountDownLatch latch = new CountDownLatch( count );
-
- final Subscription connectedObservable =
- Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
- .subscribe();
-
-
- final boolean completed = latch.await( 5, TimeUnit.SECONDS );
-
- assertTrue( "publish1 behaves as expected", completed );
-
- final boolean completedSubscription = connectedObservable.isUnsubscribed();
-
- assertTrue( "Subscription complete", completedSubscription );
- }
-
-
- @Test
- @Ignore("This seems like it should work, yet blocks forever")
- public void testConnectableObserver() throws InterruptedException {
-
- final int count = 10;
-
- final CountDownLatch latch = new CountDownLatch( count );
-
- final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
-
-
- //connect to our latch, which should run on it's own subscription
- //start our latch running
- connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
-
-
- final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
-
- //start the sequence
- connectedObservable.connect();
-
-
- final boolean completed = latch.await( 5, TimeUnit.SECONDS );
-
- assertTrue( "publish1 behaves as expected", completed );
-
- final int returnedCount = countObservable.toBlocking().last();
-
- assertEquals( "Counts the same", count, returnedCount );
- }
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
new file mode 100644
index 0000000..1d940d0
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Subscription;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test to test some assumptions about RX behaviors
+ */
+public class RxTest {
+
+ @Test
+ public void testPublish() throws InterruptedException {
+
+ final int count = 10;
+
+ final CountDownLatch latch = new CountDownLatch( count );
+
+ final Subscription connectedObservable =
+ Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
+ .subscribe();
+
+
+ final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+ assertTrue( "publish1 behaves as expected", completed );
+
+ final boolean completedSubscription = connectedObservable.isUnsubscribed();
+
+ assertTrue( "Subscription complete", completedSubscription );
+ }
+
+
+ @Test
+ @Ignore("This seems like it should work, yet blocks forever")
+ public void testConnectableObserver() throws InterruptedException {
+
+ final int count = 10;
+
+ final CountDownLatch latch = new CountDownLatch( count );
+
+ final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
+
+
+ //connect to our latch, which should run on it's own subscription
+ //start our latch running
+ connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
+
+
+ final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
+
+ //start the sequence
+ connectedObservable.connect();
+
+
+ final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+ assertTrue( "publish1 behaves as expected", completed );
+
+ final int returnedCount = countObservable.toBlocking().last();
+
+ assertEquals( "Counts the same", count, returnedCount );
+ }
+
+
+ /**
+ * Tests that reduce emits
+ */
+ @Test
+ public void testReduceEmpty(){
+ final int result = Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
+
+ assertEquals(0, result);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 79c198f..e98e0fd 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -79,15 +79,15 @@ public interface CassandraFig extends GuicyFig {
String getDiscoveryType();
- @Default("CL_LOCAL_ONE")
+ @Default("CL_LOCAL_QUORUM")
@Key(READ_CL)
String getReadCL();
- @Default("CL_LOCAL_QUORUM")
+ @Default("CL_QUORUM")
@Key(READ_CONSISTENT_CL)
String getConsistentReadCL();
- @Default("CL_QUORUM")
+ @Default("CL_LOCAL_QUORUM")
@Key(WRITE_CL)
String getWriteCL();
[11/11] usergrid git commit: Merge branch 'master' into
remove-inmemory-event-service
Posted by sf...@apache.org.
Merge branch 'master' into remove-inmemory-event-service
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fad8ecdb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fad8ecdb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fad8ecdb
Branch: refs/heads/remove-inmemory-event-service
Commit: fad8ecdb08e58a9a72a9268763dfee48e87d2b55
Parents: 5eed63d 59edea1
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 20 13:16:30 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 20 13:16:30 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 15 ++-
.../model/ElasticsearchIndexEvent.java | 3 +-
.../corepersistence/index/PublishRxTest.java | 95 ----------------
.../usergrid/corepersistence/index/RxTest.java | 108 +++++++++++++++++++
.../core/astyanax/CassandraConfig.java | 8 +-
.../core/astyanax/CassandraConfigImpl.java | 1 +
.../persistence/core/astyanax/CassandraFig.java | 6 +-
.../map/impl/MapSerializationImpl.java | 37 ++-----
.../queue/impl/SNSQueueManagerImpl.java | 10 +-
9 files changed, 140 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fad8ecdb/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fad8ecdb/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
index 0000000,1d940d0..d7b0bdb
mode 000000,100644..100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@@ -1,0 -1,108 +1,108 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.usergrid.corepersistence.index;
+
+
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.TimeUnit;
+
+ import org.junit.Ignore;
+ import org.junit.Test;
+
+ import rx.Observable;
+ import rx.Subscription;
+ import rx.observables.ConnectableObservable;
+ import rx.schedulers.Schedulers;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+
+
+ /**
+ * Test to test some assumptions about RX behaviors
+ */
+ public class RxTest {
+
+ @Test
+ public void testPublish() throws InterruptedException {
+
+ final int count = 10;
+
+ final CountDownLatch latch = new CountDownLatch( count );
+
+ final Subscription connectedObservable =
+ Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
+ .subscribe();
+
+
- final boolean completed = latch.await( 5, TimeUnit.SECONDS );
++ final boolean completed = latch.await( 3, TimeUnit.SECONDS );
+
+ assertTrue( "publish1 behaves as expected", completed );
+
+ final boolean completedSubscription = connectedObservable.isUnsubscribed();
+
+ assertTrue( "Subscription complete", completedSubscription );
+ }
+
+
+ @Test
+ @Ignore("This seems like it should work, yet blocks forever")
+ public void testConnectableObserver() throws InterruptedException {
+
+ final int count = 10;
+
+ final CountDownLatch latch = new CountDownLatch( count );
+
+ final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
+
+
+ //connect to our latch, which should run on it's own subscription
+ //start our latch running
+ connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
+
+
+ final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
+
+ //start the sequence
+ connectedObservable.connect();
+
+
+ final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+ assertTrue( "publish1 behaves as expected", completed );
+
+ final int returnedCount = countObservable.toBlocking().last();
+
+ assertEquals( "Counts the same", count, returnedCount );
+ }
+
+
+ /**
+ * Tests that reduce emits
+ */
+ @Test
+ public void testReduceEmpty(){
+ final int result = Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
+
+ assertEquals(0, result);
+ }
+
+
+ }
[08/11] usergrid git commit: Merge remote-tracking branch
'origin/USERGRID-1048'
Posted by sf...@apache.org.
Merge remote-tracking branch 'origin/USERGRID-1048'
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5f62e4c1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5f62e4c1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5f62e4c1
Branch: refs/heads/remove-inmemory-event-service
Commit: 5f62e4c167ec12ec0378d65b162e3db0481f7d07
Parents: b51287c e50835f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 20 11:43:47 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 20 11:43:47 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 2 +-
.../core/astyanax/CassandraConfig.java | 8 ++---
.../core/astyanax/CassandraConfigImpl.java | 1 +
.../map/impl/MapSerializationImpl.java | 37 ++++----------------
.../queue/impl/SNSQueueManagerImpl.java | 4 +--
5 files changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
[03/11] usergrid git commit: Merge branch 'USERGRID-1048' of
https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1048
Posted by sf...@apache.org.
Merge branch 'USERGRID-1048' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d8e65721
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d8e65721
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d8e65721
Branch: refs/heads/remove-inmemory-event-service
Commit: d8e6572196b1c9245854ce1351d4d2171fbde80b
Parents: 3a7e60b 3ec0f58
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 17:07:31 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 17:07:31 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 4 +
.../corepersistence/index/PublishRxTest.java | 95 ----------------
.../usergrid/corepersistence/index/RxTest.java | 108 +++++++++++++++++++
.../persistence/core/astyanax/CassandraFig.java | 6 +-
4 files changed, 115 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d8e65721/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
[04/11] usergrid git commit: Add null check back for batch sending of
SQS w/proper SQS client. Less logging would occur in the event of batch send
when the client is not initialized.
Posted by sf...@apache.org.
Add null check back for batch sending of SQS w/proper SQS client. Less logging would occur in the event of batch send when the client is not initialized.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbb6c823
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbb6c823
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbb6c823
Branch: refs/heads/remove-inmemory-event-service
Commit: fbb6c823b7e2a1f5f55ab044942b38d1157970c0
Parents: d8e6572
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 21:28:46 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 21:28:46 2015 -0700
----------------------------------------------------------------------
.../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbb6c823/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 3c18992..1bb00dc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -543,6 +543,11 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void sendMessages( final List bodies ) throws IOException {
+ if ( sqsAsync == null ) {
+ logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
+ return;
+ }
+
for ( Object body : bodies ) {
sendMessage( ( Serializable ) body );
}
[05/11] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid
Posted by sf...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a679aea4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a679aea4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a679aea4
Branch: refs/heads/remove-inmemory-event-service
Commit: a679aea4a46f29c60e64bfdf8e484149450c1fea
Parents: e7c805f a09485a
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 20 09:17:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 20 09:17:19 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 40 ++---
.../asyncevents/AsyncIndexProvider.java | 10 +-
.../asyncevents/model/AsyncEvent.java | 14 +-
.../asyncevents/model/EdgeDeleteEvent.java | 6 +-
.../asyncevents/model/EdgeIndexEvent.java | 9 +-
.../asyncevents/model/EntityDeleteEvent.java | 8 +-
.../asyncevents/model/EntityIndexEvent.java | 6 +-
.../model/InitializeApplicationIndexEvent.java | 4 +-
.../index/IndexProcessorFig.java | 7 +-
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../cache/CachedEntityCollectionManager.java | 147 -------------------
.../EntityCollectionManagerFactoryImpl.java | 6 -
.../usergrid/persistence/queue/QueueFig.java | 8 +-
.../queue/impl/SNSQueueManagerImpl.java | 8 +-
.../queue/impl/SQSQueueManagerImpl.java | 2 +-
.../queue/util/AmazonNotificationUtils.java | 4 +-
16 files changed, 74 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
[06/11] usergrid git commit: Merge remote-tracking branch
'origin/USERGRID-1048'
Posted by sf...@apache.org.
Merge remote-tracking branch 'origin/USERGRID-1048'
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b51287cc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b51287cc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b51287cc
Branch: refs/heads/remove-inmemory-event-service
Commit: b51287cc2693ad2dea10ebe2197bce5dcd004992
Parents: a679aea fbb6c82
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 20 09:20:14 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 20 09:20:14 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 379 ++++++++----
.../asyncevents/AsyncEventService.java | 1 +
.../asyncevents/AsyncIndexProvider.java | 12 +-
.../asyncevents/model/AsyncEvent.java | 8 +-
.../model/ElasticsearchIndexEvent.java | 51 ++
.../index/IndexProcessorFig.java | 11 +
.../util/ObjectJsonSerializer.java | 92 +++
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../index/AsyncIndexServiceTest.java | 2 +-
.../corepersistence/index/PublishRxTest.java | 95 ---
.../usergrid/corepersistence/index/RxTest.java | 108 ++++
.../persistence/core/astyanax/CassandraFig.java | 6 +-
.../usergrid/persistence/map/MapManager.java | 25 +-
.../persistence/map/impl/MapManagerImpl.java | 6 +
.../persistence/map/impl/MapSerialization.java | 27 +-
.../map/impl/MapSerializationImpl.java | 265 ++++----
.../index/impl/DeIndexOperation.java | 4 +
.../persistence/index/impl/IndexOperation.java | 4 +
.../index/impl/IndexOperationMessage.java | 5 +
.../persistence/queue/DefaultQueueManager.java | 12 +-
.../persistence/queue/QueueManager.java | 8 +-
.../persistence/queue/guice/QueueModule.java | 1 -
.../queue/impl/SNSQueueManagerImpl.java | 617 +++++++++++--------
.../queue/impl/SQSQueueManagerImpl.java | 352 -----------
.../services/queues/ImportQueueManager.java | 9 +-
25 files changed, 1126 insertions(+), 980 deletions(-)
----------------------------------------------------------------------
[10/11] usergrid git commit: Merge branch 'USERGRID-1048' of
https://git-wip-us.apache.org/repos/asf/usergrid
Posted by sf...@apache.org.
Merge branch 'USERGRID-1048' of https://git-wip-us.apache.org/repos/asf/usergrid
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/59edea1c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/59edea1c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/59edea1c
Branch: refs/heads/remove-inmemory-event-service
Commit: 59edea1c2a8a2c182daad4ee7f3e42c295984b86
Parents: 5f62e4c 1fe1d1a
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 20 13:13:12 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 20 13:13:12 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[02/11] usergrid git commit: Add sourceRegion to
ElasticsearchIndexEvent, fix logging statements,
update sqs/sns client null checks.
Posted by sf...@apache.org.
Add sourceRegion to ElasticsearchIndexEvent, fix logging statements, update sqs/sns client null checks.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3a7e60b3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3a7e60b3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3a7e60b3
Branch: refs/heads/remove-inmemory-event-service
Commit: 3a7e60b3131e207890354ca5fa84258795296372
Parents: 19d30ea
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 17:07:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 17:07:18 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 9 ++++++---
.../asyncevents/model/ElasticsearchIndexEvent.java | 3 ++-
.../persistence/queue/impl/SNSQueueManagerImpl.java | 9 ++-------
3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 67d0dab..2b583b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -498,7 +498,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
//now queue up the index message
- final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+ final ElasticsearchIndexEvent elasticsearchIndexEvent =
+ new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
//send to the topic so all regions index the batch
@@ -520,12 +521,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexOperationMessage indexOperationMessage;
if(message == null){
- logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
+ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level",
+ messageId);
final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
if(highConsistency == null){
- logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level",
+ messageId);
throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
index 207b15e..049c3a5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -35,7 +35,8 @@ public final class ElasticsearchIndexEvent extends AsyncEvent {
public ElasticsearchIndexEvent() {
}
- public ElasticsearchIndexEvent( UUID indexBatchId ) {
+ public ElasticsearchIndexEvent(String sourceRegion, UUID indexBatchId) {
+ super(sourceRegion);
this.indexBatchId = indexBatchId;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 58b2a4d..3c18992 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -543,11 +543,6 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void sendMessages( final List bodies ) throws IOException {
- if ( snsAsync == null ) {
- logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
- return;
- }
-
for ( Object body : bodies ) {
sendMessage( ( Serializable ) body );
}
@@ -557,8 +552,8 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public <T extends Serializable> void sendMessage( final T body ) throws IOException {
- if ( snsAsync == null ) {
- logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+ if ( sqsAsync == null ) {
+ logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
return;
}
[07/11] usergrid git commit: Fixes comments and refactors map to be a
cleaner read pattern
Posted by sf...@apache.org.
Fixes comments and refactors map to be a cleaner read pattern
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e50835f1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e50835f1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e50835f1
Branch: refs/heads/remove-inmemory-event-service
Commit: e50835f1016a9988513a00b64337222957fa7747
Parents: fbb6c82
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 20 09:34:43 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 20 09:34:43 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 2 +-
.../core/astyanax/CassandraConfig.java | 8 ++---
.../core/astyanax/CassandraConfigImpl.java | 1 +
.../map/impl/MapSerializationImpl.java | 37 ++++----------------
.../queue/impl/SNSQueueManagerImpl.java | 4 +--
5 files changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 38c2966..6f779b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -521,7 +521,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexOperationMessage indexOperationMessage;
if(message == null){
- logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level",
+ logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level",
messageId);
final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index 817aee2..dba3646 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -34,25 +34,25 @@ public interface CassandraConfig {
* Get the currently configured ReadCL
* @return
*/
- public ConsistencyLevel getReadCL();
+ ConsistencyLevel getReadCL();
/**
* Get the currently configured ReadCL that is more consitent than getReadCL
* @return
*/
- public ConsistencyLevel getConsistentReadCL();
+ ConsistencyLevel getConsistentReadCL();
/**
* Get the currently configured write CL
* @return
*/
- public ConsistencyLevel getWriteCL();
+ ConsistencyLevel getWriteCL();
/**
* Return the number of shards that has been set in the property file
* @return
*/
- public int[] getShardSettings();
+ int[] getShardSettings();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
index 17b91c6..7373322 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
@@ -83,6 +83,7 @@ public class CassandraConfigImpl implements CassandraConfig {
public ConsistencyLevel getConsistentReadCL() {
return consistentCl;
}
+
@Override
public ConsistencyLevel getWriteCL() {
return writeCl;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index ffe10c9..ceeb3ad 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -129,14 +129,14 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public String getString( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key );
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getStringValue() : null;
}
@Override
public String getStringHighConsistency( final MapScope scope, final String key ) {
- Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean?
return ( col != null ) ? col.getStringValue() : null;
}
@@ -248,7 +248,7 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public UUID getUuid( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key );
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getUUIDValue() : null;
}
@@ -283,7 +283,7 @@ public class MapSerializationImpl implements MapSerialization {
@Override
public Long getLong( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key );
+ Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getLongValue() : null;
}
@@ -355,31 +355,7 @@ public class MapSerializationImpl implements MapSerialization {
}
- private Column<Boolean> getValue( MapScope scope, String key ) {
-
-
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
-
- //now get all columns, including the "old row key value"
- try {
- final Column<Boolean> result =
- keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
-
- return result;
- }
- catch ( NotFoundException nfe ) {
- //nothing to return
- return null;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
- }
-
-
- private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
-
+ private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
@@ -387,8 +363,7 @@ public class MapSerializationImpl implements MapSerialization {
//now get all columns, including the "old row key value"
try {
final Column<Boolean> result =
- keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getConsistentReadCL() )
- .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+ keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 1bb00dc..bc5f2f1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -438,7 +438,7 @@ public class SNSQueueManagerImpl implements QueueManager {
/**
* When a message originates from SNS it has a "Message" we have to extract
- * it and then process it seperately
+ * it and then process it separately
*/
@@ -546,7 +546,7 @@ public class SNSQueueManagerImpl implements QueueManager {
if ( sqsAsync == null ) {
logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
return;
- }
+ }
for ( Object body : bodies ) {
sendMessage( ( Serializable ) body );
[09/11] usergrid git commit: Fixes incorrect units on timeout from
millis to seconds.
Posted by sf...@apache.org.
Fixes incorrect units on timeout from millis to seconds.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1fe1d1a3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1fe1d1a3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1fe1d1a3
Branch: refs/heads/remove-inmemory-event-service
Commit: 1fe1d1a34e806ebfc53d0c6cb729e7e4aab804a1
Parents: e50835f
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 20 11:37:03 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 20 11:44:35 2015 -0600
----------------------------------------------------------------------
.../corepersistence/asyncevents/AmazonAsyncEventService.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1fe1d1a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6f779b5..7034a67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -491,8 +492,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
final UUID newMessageId = UUIDGenerator.newTimeUUID();
+ final int expirationTimeInSeconds =
+ ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
//write to the map in ES
- esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );