You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:36 UTC
[07/25] usergrid git commit: Initial integration of Qakka into
Usergrid Queue module,
and implementation of Qakka-based LegacyQueueManager implementation.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
new file mode 100644
index 0000000..572c897
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class ShardIteratorTest extends AbstractTest {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardIteratorTest.class );
+
+ @Test
+ public void getActiveShards(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+ Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
+
+ shardSerialization.createShard(shard1);
+ shardSerialization.createShard(shard2);
+
+ Iterator<Shard> shardIterator = new ShardIterator(
+ cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.empty());
+
+ List<Shard> shards = new ArrayList<>(1);
+
+
+ shardIterator.forEachRemaining(shard -> {
+
+ logger.info("Shard ID: {}", shard.getShardId());
+ shards.add(shard);
+
+ });
+
+ assertTrue(shards.size() == 2 && shards.get(0).equals(shard1) && shards.get(1).equals(shard2));
+
+
+ }
+
+ @Test
+ public void seekActiveShards(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+ Shard shard2 = new Shard("test", "region1", Shard.Type.DEFAULT, 200L, null);
+ Shard shard3 = new Shard("test", "region1", Shard.Type.DEFAULT, 300L, null);
+
+ shardSerialization.createShard(shard1);
+ shardSerialization.createShard(shard2);
+ shardSerialization.createShard(shard3);
+
+
+ Iterator<Shard> shardIterator = new ShardIterator(
+ cassandraClient, "test", "region1", Shard.Type.DEFAULT, Optional.of(200L));
+
+ List<Shard> shards = new ArrayList<>(1);
+
+ shardIterator.forEachRemaining(shard -> {
+
+ logger.info("Shard ID: {}", shard.getShardId());
+ shards.add(shard);
+
+ });
+
+ assertTrue(shards.size() == 1 && shards.get(0).equals(shard3));
+ }
+
+
+ @Test
+ public void shardIteratorOrdering() throws Exception {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ int numShards = 10;
+ String region = "default";
+ String queueName = "sit_queue_" + RandomStringUtils.randomAlphanumeric(20);
+
+ for ( long i=0; i<numShards; i++) {
+ UUID messageId = QakkaUtils.getTimeUuid();
+ Shard shard = new Shard( queueName, region, Shard.Type.DEFAULT, i+1, messageId );
+ shardSerialization.createShard( shard );
+ try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
+ }
+
+ Iterator<Shard> shardIterator = new ShardIterator(
+ cassandraClient, queueName, region, Shard.Type.DEFAULT, Optional.empty());
+
+ int count = 0;
+ Long prevTimestamp = null;
+ while ( shardIterator.hasNext() ) {
+ Shard shard = shardIterator.next();
+ if ( prevTimestamp != null ) {
+ Assert.assertTrue( prevTimestamp < shard.getPointer().timestamp() );
+ }
+ prevTimestamp = shard.getPointer().timestamp();
+ count++;
+ }
+
+ Assert.assertEquals( numShards, count );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
new file mode 100644
index 0000000..e1a541b
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class ShardSerializationTest extends AbstractTest {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardSerializationTest.class );
+
+
+ @Test
+ public void writeNewShard(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+ shardSerialization.createShard(shard1);
+ }
+
+ @Test
+ public void deleteShard(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ shardSerialization.createShard(shard1);
+ shardSerialization.deleteShard(shard1);
+ assertNull(shardSerialization.loadShard(shard1));
+
+
+
+ }
+
+ @Test
+ public void loadNullShard(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null);
+
+ assertNull(shardSerialization.loadShard(shard1));
+
+
+
+ }
+
+ @Test
+ public void updatePointer(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraClient );
+
+ Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+ shardSerialization.createShard(shard1);
+
+ final UUID pointer = QakkaUtils.getTimeUuid();
+
+ shard1.setPointer(pointer);
+ shardSerialization.updateShardPointer(shard1);
+
+ Shard returnedShard = shardSerialization.loadShard(shard1);
+
+ assertEquals(pointer, returnedShard.getPointer());
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
new file mode 100644
index 0000000..ea73abc
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategyTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+
+public class ShardStrategyTest extends AbstractTest {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardStrategyTest.class );
+
+
+ @Test
+ public void testBasicOperation() {
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+
+ ShardSerialization shardSer = getInjector().getInstance( ShardSerialization.class );
+ ShardStrategy shardStrategy = getInjector().getInstance( ShardStrategy.class );
+
+ UUID messageIdToLocate = null;
+ long selectedShardId = 4L;
+
+ int numShards = 10;
+ String region = "default";
+ String queueName = "sst_queue_" + RandomStringUtils.randomAlphanumeric(20);
+
+ for ( long i=0; i<numShards; i++) {
+ shardSer.createShard( new Shard( queueName, region, Shard.Type.DEFAULT, i, QakkaUtils.getTimeUuid()));
+ try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
+ if ( i == selectedShardId ) {
+ messageIdToLocate = QakkaUtils.getTimeUuid();
+ }
+ try { Thread.sleep(10); } catch (Exception intentionallyIgnored) {}
+ }
+
+ Shard selectedShard = shardStrategy.selectShard( queueName, region, Shard.Type.DEFAULT, messageIdToLocate );
+
+ Assert.assertEquals( selectedShardId, selectedShard.getShardId() );
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java
new file mode 100644
index 0000000..fba135a
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by russo on 6/9/16.
+ */
+public class ShardTest extends AbstractTest {
+
+
+ @Test
+ public void testEquals(){
+
+ Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 100L, null);
+ Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 100L, null);
+
+ assertEquals(shard1, shard2);
+
+ }
+
+ @Test
+ public void testHashCode(){
+
+ Shard shard1 = new Shard("test", "region", Shard.Type.DEFAULT, 10000000000L, null);
+ Shard shard2 = new Shard("test", "region", Shard.Type.DEFAULT, 10000000000L, null);
+
+ assertEquals(shard1.hashCode(), shard2.hashCode());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
new file mode 100644
index 0000000..20b72b0
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerializationTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog;
+
+import com.datastax.driver.core.PagingState;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class TransferLogSerializationTest extends AbstractTest {
+
+ @Test
+ public void recordTransferLog() throws Exception {
+
+ TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
+ String source = RandomStringUtils.randomAlphanumeric( 15 );
+ String dest = RandomStringUtils.randomAlphanumeric( 15 );
+
+ int numLogs = 100;
+
+ for ( int i=0; i<numLogs; i++ ) {
+ logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID());
+ }
+
+ int count = 0;
+ int fetchCount = 0;
+ PagingState pagingState = null;
+ while ( true ) {
+
+ Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 );
+
+ // we only want entities for our queue
+ List<TransferLog> logs = all.getEntities().stream()
+ .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+
+ count += logs.size();
+ fetchCount++;
+ if ( all.getPagingState() == null ) {
+ break;
+ }
+ pagingState = all.getPagingState();
+ }
+
+ Assert.assertEquals( numLogs, count );
+ }
+
+ @Test
+ public void removeTransferLog() throws Exception {
+
+ TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
+ String source = RandomStringUtils.randomAlphanumeric( 15 );
+ String dest = RandomStringUtils.randomAlphanumeric( 15 );
+
+ UUID messageId = UUIDGen.getTimeUUID();
+ logSerialization.recordTransferLog( queueName, source, dest, messageId );
+
+ List<TransferLog> allLogs = getTransferLogs( logSerialization );
+
+ // we only want entities for our queue
+ List<TransferLog> logs = allLogs.stream()
+ .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+ Assert.assertEquals( 1, logs.size());
+
+ logSerialization.removeTransferLog( queueName, source, dest, messageId );
+
+ List<TransferLog> all = getTransferLogs( logSerialization );
+ logs = all.stream()
+ .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
+ Assert.assertEquals( 0, logs.size());
+
+ try {
+ logSerialization.removeTransferLog( queueName, source, dest, messageId );
+ Assert.fail("Removing non-existent log should throw exception");
+
+ } catch ( QakkaException expected ) {
+ // success!
+ }
+ }
+
+ private List<TransferLog> getTransferLogs(TransferLogSerialization logSerialization) {
+ PagingState pagingState = null;
+ List<TransferLog> allLogs = new ArrayList<>();
+ while ( true ) {
+ Result<TransferLog> result = logSerialization.getAllTransferLogs( pagingState, 100 );
+ allLogs.addAll( result.getEntities() );
+ if ( result.getPagingState() == null ) {
+ break;
+ }
+ pagingState = result.getPagingState();
+ }
+ return allLogs;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 69655e5..4b6e9d3 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -20,72 +20,67 @@
package org.apache.usergrid.persistence.queue;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.App;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
-import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
-
-import com.google.inject.Inject;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-@RunWith( ITRunner.class )
-@UseModules( { TestQueueModule.class } )
-public class LegacyQueueManagerTest {
- @Inject
- protected LegacyQueueFig queueFig;
- @Inject
- protected LegacyQueueManagerFactory qmf;
+public class LegacyQueueManagerTest extends AbstractTest {
- /**
- * Mark tests as ignored if no AWS creds are present
- */
- @Rule
- public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule();
+ public static long queueSeed = System.currentTimeMillis();
+ // give each test its own injector
+ @Override
+ protected Injector getInjector() {
+ return Guice.createInjector( new QueueModule() );
+ }
- protected LegacyQueueScope scope;
- private LegacyQueueManager qm;
- public static long queueSeed = System.currentTimeMillis();
+ @Test
+ public void send() throws Exception{
+ Injector myInjector = getInjector();
- @Before
- public void mockApp() {
+ CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
- this.scope = new LegacyQueueScopeImpl( "testQueue"+queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL);
- qm = qmf.getQueueManager(scope);
- }
+ ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
+ String region = actorSystemFig.getRegionLocal();
- @org.junit.After
- public void cleanup(){
- qm.deleteQueue();
- }
+ App app = myInjector.getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+ final LegacyQueueScopeImpl scope =
+ new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
+ LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
+ LegacyQueueManager qm = qmf.getQueueManager(scope);
- @Test
- public void send() throws Exception{
String value = "bodytest";
qm.sendMessage(value);
+
+ Thread.sleep(5000);
+
List<LegacyQueueMessage> messageList = qm.getMessages(1, String.class);
assertTrue(messageList.size() >= 1);
for(LegacyQueueMessage message : messageList){
- assertTrue(message.getBody().equals(value));
+ assertEquals( value, message.getBody() );
qm.commitMessage(message);
}
@@ -96,12 +91,32 @@ public class LegacyQueueManagerTest {
@Test
public void sendMore() throws Exception{
+
+ Injector myInjector = getInjector();
+
+ CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
+ String region = actorSystemFig.getRegionLocal();
+
+ App app = myInjector.getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ final LegacyQueueScopeImpl scope =
+ new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
+ LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
+ LegacyQueueManager qm = qmf.getQueueManager(scope);
+
HashMap<String,String> values = new HashMap<>();
values.put("test","Test");
List<Map<String,String>> bodies = new ArrayList<>();
bodies.add(values);
qm.sendMessages(bodies);
+
+ Thread.sleep(5000);
+
List<LegacyQueueMessage> messageList = qm.getMessages(1, values.getClass());
assertTrue(messageList.size() >= 1);
for(LegacyQueueMessage message : messageList){
@@ -115,7 +130,25 @@ public class LegacyQueueManagerTest {
}
@Test
+ @Ignore("Not implemented yet")
public void queueSize() throws Exception{
+
+ Injector myInjector = getInjector();
+
+ CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
+ cassandraClient.getSession();
+
+ ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
+ String region = actorSystemFig.getRegionLocal();
+
+ App app = myInjector.getInstance( App.class );
+ app.start( "localhost", getNextAkkaPort(), region );
+
+ final LegacyQueueScopeImpl scope =
+ new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
+ LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
+ LegacyQueueManager qm = qmf.getQueueManager(scope);
+
HashMap<String,String> values = new HashMap<>();
values.put("test", "Test");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
index 8390672..70e3543 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/guice/TestQueueModule.java
@@ -18,7 +18,6 @@
package org.apache.usergrid.persistence.queue.guice;
-import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.guice.TestModule;
@@ -26,7 +25,6 @@ public class TestQueueModule extends TestModule {
@Override
protected void configure() {
- install( new CommonModule());
install( new QueueModule() );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/cassandra.yaml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/cassandra.yaml b/stack/corepersistence/queue/src/test/resources/cassandra.yaml
new file mode 100644
index 0000000..e97bf00
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/resources/cassandra.yaml
@@ -0,0 +1,53 @@
+
+cluster_name: 'Embedded Test Cluster'
+
+# ports
+storage_port: 7075
+listen_address: localhost
+rpc_address: localhost
+rpc_port: 9160
+native_transport_port: 9042
+
+# data files
+data_file_directories:
+ - target/embeddedCassandra/data
+commitlog_directory: target/embeddedCassandra/commitlog
+saved_caches_directory: target/embeddedCassandra/saved_caches
+
+# native transport!
+start_native_transport: true
+
+# other stuff
+start_rpc: true
+initial_token:
+auto_bootstrap: false
+hinted_handoff_enabled: true
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
+partitioner: org.apache.cassandra.dht.RandomPartitioner
+seed_provider:
+ - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+ parameters:
+ - seeds: "127.0.0.1"
+commitlog_sync: periodic
+commitlog_sync_period_in_ms: 10000
+disk_access_mode: auto
+concurrent_reads: 2
+concurrent_writes: 4
+rpc_keepalive: true
+thrift_framed_transport_size_in_mb: 15
+thrift_max_message_length_in_mb: 16
+snapshot_before_compaction: false
+column_index_size_in_kb: 64
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+dynamic_snitch_update_interval_in_ms: 100
+dynamic_snitch_reset_interval_in_ms: 600000
+dynamic_snitch_badness_threshold: 0.0
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+server_encryption_options:
+ internode_encryption: none
+ keystore: conf/.keystore
+ keystore_password: cassandra
+ truststore: conf/.truststore
+ truststore_password: cassandra
+index_interval: 128
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3c679f5
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=ERROR,stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
+
+log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+
+log4j.logger.org.apache.cassandra=WARN
+log4j.logger.org.glassfish=WARN
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg b/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg
new file mode 100644
index 0000000..8a0e0a2
Binary files /dev/null and b/stack/corepersistence/queue/src/test/resources/qakka-duck.jpg differ
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
new file mode 100644
index 0000000..c3b613c
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Properties for JUnit tests
+
+usergrid.cluster_name=Test Cluster
+
+usergrid.cluster.hostname=localhost
+
+# Comma-separated list of regions to be considered
+usergrid.cluster.region.list=us-east
+
+# The regions of this local instance of Usergrid
+usergrid.cluster.region.local=us-east
+
+# Comma-separated lists of cluster seeds each with format {region}:{hostname}
+usergrid.cluster.seeds=us-east:localhost
+
+# Port used for cluster communications.
+usergrid.cluster.port=2551
+
+queue.writer.num.actors=100
+
+# set shard size and times low for testing purposes
+queue.shard.max.size=500
+queue.shard.allocation.check.frequency.millis=100
+queue.shard.allocation.advance.time.millis=200
+
+queue.max.inmemory.shard.counter = 100
+
+cassandra.hosts=localhost
+
+cassandra.keyspace.application=qakka_test
+
+cassandra.keyspace-drop-and-create=true
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 5186a13..d739bb4 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -100,9 +100,9 @@
<amber-version>0.22-incubating</amber-version>
<astyanax.version>3.9.0</astyanax.version>
<aws.version>1.10.20</aws.version>
- <cassandra-version>1.2.18</cassandra-version>
+ <cassandra-version>2.1.14</cassandra-version>
<guava.version>18.0</guava.version>
- <guice.version>4.0-beta5</guice.version>
+ <guice.version>4.0</guice.version>
<hector-om-version>3.0-03</hector-om-version>
<hector-version>1.1-4</hector-version>
<hector-test-version>1.1-4</hector-test-version>
@@ -110,7 +110,7 @@
<jackson-version>1.9.9</jackson-version>
<jackson-2-version>2.3.3</jackson-2-version>
<jclouds.version>1.9.0</jclouds.version>
- <jersey-version>2.21</jersey-version>
+ <jersey-version>2.23.1</jersey-version>
<junit-version>4.12</junit-version>
<log4j-version>1.2.16</log4j-version>
<org.springframework.version>3.2.13.RELEASE</org.springframework.version>
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 3d4911d..8356f16 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -16,24 +16,26 @@
*/
package org.apache.usergrid.services.notifications.apns;
-import com.relayrides.pushy.apns.*;
-import com.relayrides.pushy.apns.util.*;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.PushManagerConfiguration;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
-import org.mortbay.util.ajax.JSON;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.services.ServicePayload;
import org.apache.usergrid.services.notifications.ConnectionException;
import org.apache.usergrid.services.notifications.ProviderAdapter;
import org.apache.usergrid.services.notifications.TaskTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
/**
* Adapter for Apple push notifications
@@ -47,6 +49,8 @@ public class APNsAdapter implements ProviderAdapter {
private static final String TEST_TOKEN = "ff026b5a4d2761ef13843e8bcab9fc83b47f1dfbd1d977d225ab296153ce06d6";
private static final String TEST_PAYLOAD = "{}";
+ private static ObjectMapper objectMapper = new ObjectMapper();
+
static {
validEnvironments.add("development");
validEnvironments.add("production");
@@ -155,7 +159,7 @@ public class APNsAdapter implements ProviderAdapter {
payload = "{\"aps\":{\"alert\":\"" + payload + "\"}}";
}
} else {
- payload = JSON.toString(objPayload);
+ payload = objectMapper.writeValueAsString( objPayload );
}
if (payload.length() > 2048) {
throw new IllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index 7929ad4..6b619b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -16,11 +16,12 @@
*/
package org.apache.usergrid.services.notifications.gcm;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.android.gcm.server.*;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.services.notifications.InactiveDeviceManager;
-import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class GCMAdapter implements ProviderAdapter {
private final Notifier notifier;
private EntityManager entityManager;
+ private static ObjectMapper objectMapper = new ObjectMapper();
+
private ConcurrentHashMap<Long,Batch> batches;
private static final String ttlKey = "time_to_live";
@@ -147,9 +150,9 @@ public class GCMAdapter implements ProviderAdapter {
throw new IllegalArgumentException(
"GCM Payload must be either a Map or a String");
}
- if (JSON.toString(mapPayload).length() > 4096) {
- throw new IllegalArgumentException(
- "GCM payloads must be 4096 characters or less");
+ String payloadString = objectMapper.writeValueAsString( mapPayload );
+ if ( payloadString.length() > 4096) {
+ throw new IllegalArgumentException( "GCM payloads must be 4096 characters or less");
}
return mapPayload;
}