You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:38 UTC
[09/25] usergrid git commit: Initial integration of Qakka into
Usergrid Queue module,
and implementation of Qakka-based LegacyQueueManager implementation.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
new file mode 100644
index 0000000..9158412
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+@Singleton
+public class ShardCounterSerializationImpl implements ShardCounterSerialization {
+ private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ final static String TABLE_COUNTERS = "shard_counters";
+ final static String COLUMN_QUEUE_NAME = "queue_name";
+ final static String COLUMN_SHARD_ID = "shard_id";
+ final static String COLUMN_COUNTER_VALUE = "counter_value";
+ final static String COLUMN_SHARD_TYPE = "shard_type";
+
+ static final String CQL =
+ "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+ "counter_value counter, " +
+ "queue_name varchar, " +
+ "shard_type varchar, " +
+ "shard_id bigint, " +
+ "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+ "); ";
+
+ final long maxInMemoryIncrement;
+
+ class InMemoryCount {
+ long baseCount;
+ final AtomicLong increment = new AtomicLong( 0L );
+ InMemoryCount( long baseCount ) {
+ this.baseCount = baseCount;
+ }
+ public long value() {
+ return baseCount + increment.get();
+ }
+ public AtomicLong getIncrement() {
+ return increment;
+ }
+ void setBaseCount( long baseCount ) {
+ this.baseCount = baseCount;
+ }
+ }
+
+ private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200);
+
+
+ @Inject
+ public ShardCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+ this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+ this.cassandraClient = cassandraClient;
+ }
+
+
+ @Override
+ public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+
+ String key = queueName + type + shardId;
+ synchronized ( inMemoryCounters ) {
+
+ if ( inMemoryCounters.get( key ) == null ) {
+
+ Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+ if ( value == null ) {
+ incrementCounterInStorage( queueName, type, shardId, 0L );
+ inMemoryCounters.put( key, new InMemoryCount( 0L ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
+ }
+ inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
+ return;
+ }
+ }
+
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+ synchronized ( inMemoryCount ) {
+ long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+
+ if (totalIncrement > maxInMemoryIncrement) {
+ incrementCounterInStorage( queueName, type, shardId, totalIncrement );
+ inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
+ inMemoryCount.getIncrement().set( 0L );
+ }
+ }
+
+ }
+
+
+ @Override
+ public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+
+ String key = queueName + type + shardId;
+
+ synchronized ( inMemoryCounters ) {
+
+ if ( inMemoryCounters.get( key ) == null ) {
+
+ Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+ if ( value == null ) {
+ throw new NotFoundException(
+ MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
+ queueName, type, shardId ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
+ }
+ }
+ }
+
+ return inMemoryCounters.get( key ).value();
+ }
+
+ void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
+
+ Statement update = QueryBuilder.update( TABLE_COUNTERS )
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) )
+ .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
+ cassandraClient.getSession().execute( update );
+ }
+
+
+ Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+
+ Statement query = QueryBuilder.select().from( TABLE_COUNTERS )
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+
+ ResultSet resultSet = cassandraClient.getSession().execute( query );
+ List<Row> all = resultSet.all();
+
+ if ( all.size() > 1 ) {
+ throw new QakkaRuntimeException(
+ "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+ }
+ if ( all.isEmpty() ) {
+ return null;
+ }
+ return all.get(0).getLong( COLUMN_COUNTER_VALUE );
+ }
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.singletonList( new TableDefinitionStringImpl( "shard_counters", CQL ) );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
new file mode 100644
index 0000000..7b9fd8e
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Assignment;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+
+public class ShardSerializationImpl implements ShardSerialization {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ public final static String COLUMN_QUEUE_NAME = "queue_name";
+ public final static String COLUMN_REGION = "region";
+ public final static String COLUMN_SHARD_ID = "shard_id";
+ public final static String COLUMN_ACTIVE = "active";
+ public final static String COLUMN_POINTER = "pointer";
+
+
+ public final static String TABLE_SHARDS_MESSAGES_AVAILABLE = "shards_messages_available";
+
+ public final static String TABLE_SHARDS_MESSAGES_INFLIGHT = "shards_messages_inflight";
+
+
+ static final String SHARDS_MESSAGES_AVAILABLE =
+ "CREATE TABLE IF NOT EXISTS shards_messages_available ( " +
+ "queue_name text, " +
+ "region text, " +
+ "shard_id bigint, " +
+ "active int, " +
+ "pointer timeuuid, " +
+ "PRIMARY KEY ((queue_name, region), active, shard_id) " +
+ ") WITH CLUSTERING ORDER BY (active DESC, shard_id ASC); ";
+
+ static final String SHARDS_MESSAGES_AVAILABLE_INFLIGHT =
+ "CREATE TABLE IF NOT EXISTS shards_messages_inflight ( " +
+ "queue_name text, " +
+ "region text, " +
+ "shard_id bigint, " +
+ "active int, " +
+ "pointer timeuuid, " +
+ "PRIMARY KEY ((queue_name, region), active, shard_id) " +
+ ") WITH CLUSTERING ORDER BY (active DESC, shard_id ASC); ";
+
+
+ @Inject
+ public ShardSerializationImpl( CassandraClient cassandraClient ) {
+ this.cassandraClient = cassandraClient;
+ }
+
+ public void createShard(final Shard shard){
+
+ Statement insert = QueryBuilder.insertInto(getTableName(shard.getType()))
+ .value(COLUMN_QUEUE_NAME, shard.getQueueName())
+ .value(COLUMN_REGION, shard.getRegion())
+ .value(COLUMN_SHARD_ID, shard.getShardId())
+ .value(COLUMN_ACTIVE, 1)
+ .value(COLUMN_POINTER, shard.getPointer());
+
+ cassandraClient.getSession().execute(insert);
+
+ }
+
+ public Shard loadShard(final Shard shard){
+
+ Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName());
+ Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion());
+ Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
+ Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId());
+
+
+
+ Statement select = QueryBuilder.select().from(getTableName(shard.getType()))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(activeClause)
+ .and(shardIdClause);
+
+ Row row = cassandraClient.getSession().execute(select).one();
+
+ if (row == null){
+ return null;
+ }
+
+ final String queueName = row.getString(COLUMN_QUEUE_NAME);
+ final String region = row.getString(COLUMN_REGION);
+ final long shardId = row.getLong(COLUMN_SHARD_ID);
+ final UUID pointer = row.getUUID(COLUMN_POINTER);
+
+ return new Shard(queueName, region, shard.getType(), shardId, pointer);
+
+
+
+ }
+
+
+ public void deleteShard(final Shard shard){
+
+ Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName());
+ Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion());
+ Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
+ Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId());
+
+
+
+ Statement delete = QueryBuilder.delete().from(getTableName(shard.getType()))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(activeClause)
+ .and(shardIdClause);
+
+ cassandraClient.getSession().execute(delete);
+
+ }
+
+ public void updateShardPointer(final Shard shard){
+
+ Assignment assignment = QueryBuilder.set(COLUMN_POINTER, shard.getPointer());
+
+ Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, shard.getQueueName());
+ Clause regionClause = QueryBuilder.eq(COLUMN_REGION, shard.getRegion());
+ Clause activeClause = QueryBuilder.eq(COLUMN_ACTIVE, 1);
+ Clause shardIdClause = QueryBuilder.eq(COLUMN_SHARD_ID, shard.getShardId());
+
+ Statement update = QueryBuilder.update(getTableName(shard.getType()))
+ .with(assignment)
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(activeClause)
+ .and(shardIdClause);
+
+ cassandraClient.getSession().execute(update);
+
+ }
+
+ public static String getTableName(Shard.Type shardType){
+
+ String table;
+ if( shardType.equals(Shard.Type.DEFAULT)) {
+ table = TABLE_SHARDS_MESSAGES_AVAILABLE;
+ }else if (shardType.equals(Shard.Type.INFLIGHT)) {
+ table = TABLE_SHARDS_MESSAGES_INFLIGHT;
+ }else{
+ throw new IllegalArgumentException("Unknown ShardType");
+ }
+
+ return table;
+
+ }
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Lists.newArrayList(
+ new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ),
+ new TableDefinitionStringImpl( TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT )
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java
new file mode 100644
index 0000000..cfd9a60
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardStrategyImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+
+import java.text.MessageFormat;
+import java.util.Optional;
+import java.util.UUID;
+
+
+public class ShardStrategyImpl implements ShardStrategy {
+
+ final CassandraClient cassandraClient;
+
+ @Inject
+ public ShardStrategyImpl(CassandraClient cassandraClient) {
+ this.cassandraClient = cassandraClient;
+ }
+
+ @Override
+ public Shard selectShard(String queueName, String region, Shard.Type shardType, UUID pointer) {
+
+ // use shard iterator to walk through shards until shard can be found
+
+ ShardIterator shardIterator = new ShardIterator(
+ cassandraClient, queueName, region, shardType, Optional.empty() );
+
+ if ( !shardIterator.hasNext() ) {
+ String msg = MessageFormat.format(
+ "No shards found for queue {0} region {1} type {2}", queueName, region, shardType );
+ throw new NotFoundException( msg );
+ }
+
+ // walk through shards from oldest to newest
+
+ Shard prev = shardIterator.next();
+ while ( shardIterator.hasNext() ) {
+ Shard next = shardIterator.next();
+
+ // if item is older than the next shard, the use prev shard
+ if ( pointer.timestamp() < next.getPointer().timestamp() ) {
+ return prev;
+ }
+ prev = next;
+ }
+ return prev;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java
new file mode 100644
index 0000000..048096d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLog.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog;
+
+import java.util.UUID;
+
+
+public class TransferLog {
+ String queueName;
+ String sourceRegion;
+ String destRegion;
+ UUID messageId;
+ long transfer_time;
+
+ public TransferLog(
+ String queueName,
+ String sourceRegion,
+ String destRegion,
+ UUID messageId,
+ long transfer_time) {
+ this.queueName = queueName;
+ this.sourceRegion = sourceRegion;
+ this.destRegion = destRegion;
+ this.messageId = messageId;
+ this.transfer_time = transfer_time;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public String getSourceRegion() {
+ return sourceRegion;
+ }
+
+ public void setSourceRegion(String sourceRegion) {
+ this.sourceRegion = sourceRegion;
+ }
+
+ public String getDestRegion() {
+ return destRegion;
+ }
+
+ public void setDestRegion(String destRegion) {
+ this.destRegion = destRegion;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(UUID messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getTransfer_time() {
+ return transfer_time;
+ }
+
+ public void setTransfer_time(long transfer_time) {
+ this.transfer_time = transfer_time;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java
new file mode 100644
index 0000000..ea155d7
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/TransferLogSerialization.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog;
+
+import com.datastax.driver.core.PagingState;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+
+import java.util.UUID;
+
+
+public interface TransferLogSerialization extends Migration {
+
+ /**
+ * Record transfer log record.
+ *
+ * @param queueName Name of queue.
+ * @param source Source region.
+ * @param dest Destination region.
+ * @param messageId UUID of message in message_data table.
+ */
+ void recordTransferLog(
+ String queueName, String source, String dest, UUID messageId);
+
+ /**
+ * Remove transfer log record.
+ *
+ * @param queueName Name of queue.
+ * @param source Source region.
+ * @param dest Destination region.
+ * @param messageId UUID of message in message_data table.
+ * @throws QakkaException If transfer log message was not found or could not be removed.
+ */
+ void removeTransferLog(
+ String queueName, String source, String dest, UUID messageId) throws QakkaException;
+
+ /**
+ * Get all transfer logs (for testing purposes)
+ *
+ * @param pagingState Paging state (or null if none)
+ * @param fetchSize Number of rows to be fetched per page (or -1 for default)
+ */
+ Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
new file mode 100644
index 0000000..f9fb0dc
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.transferlog.impl;
+
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLog;
+import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class TransferLogSerializationImpl implements TransferLogSerialization {
+
+ private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ public final static String TABLE_TRANSFER_LOG = "transfer_log";
+
+ public final static String COLUMN_QUEUE_NAME = "queue_name";
+ public final static String COLUMN_SOURCE_REGION = "source_region";
+ public final static String COLUMN_DEST_REGION = "dest_region";
+ public final static String COLUMN_MESSAGE_ID = "message_id";
+ public final static String COLUMN_TRANSFER_TIME = "transfer_time";
+
+ static final String CQL =
+ "CREATE TABLE IF NOT EXISTS transfer_log ( " +
+ "queue_name text, " +
+ "source_region text, " +
+ "dest_region text, " +
+ "message_id timeuuid, " +
+ "transfer_time bigint, " +
+ "PRIMARY KEY ((queue_name, dest_region, message_id)) " +
+ "); ";
+
+
+ @Inject
+ public TransferLogSerializationImpl( CassandraClient cassandraClient ) {
+ this.cassandraClient = cassandraClient;
+ }
+
+
+ @Override
+ public void recordTransferLog(
+ String queueName, String source, String dest, UUID messageId) {
+
+ Statement insert = QueryBuilder.insertInto(TABLE_TRANSFER_LOG)
+ .value(COLUMN_QUEUE_NAME, queueName )
+ .value(COLUMN_SOURCE_REGION, source )
+ .value(COLUMN_DEST_REGION, dest )
+ .value(COLUMN_MESSAGE_ID, messageId )
+ .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() );
+ cassandraClient.getSession().execute(insert);
+ }
+
+
+ @Override
+ public void removeTransferLog(
+ String queueName, String source, String dest, UUID messageId ) throws QakkaException {
+
+ Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG)
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ))
+ .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest ))
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ));
+ ResultSet rs = cassandraClient.getSession().execute( query );
+
+ if ( rs.getAvailableWithoutFetching() == 0 ) {
+ StringBuilder sb = new StringBuilder();
+ sb.append( "Transfer log entry not found for queueName=" ).append( queueName );
+ sb.append( " source=" ).append( source );
+ sb.append( " dest=" ).append( dest );
+ sb.append( " messageId=" ).append( messageId );
+ throw new QakkaException( sb.toString() );
+ }
+
+ Statement deleteQuery = QueryBuilder.delete().from(TABLE_TRANSFER_LOG)
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ))
+ .and( QueryBuilder.eq( COLUMN_DEST_REGION, dest ))
+ .and( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ));
+ cassandraClient.getSession().execute( deleteQuery );
+ }
+
+
+ @Override
+ public Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize ) {
+
+ Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG);
+
+ query.setFetchSize( fetchSize );
+ if ( pagingState != null ) {
+ query.setPagingState( pagingState );
+ }
+
+ ResultSet rs = cassandraClient.getSession().execute( query );
+ final PagingState newPagingState = rs.getExecutionInfo().getPagingState();
+
+ final List<TransferLog> transferLogs = new ArrayList<>();
+ int numReturned = rs.getAvailableWithoutFetching();
+ for ( int i=0; i<numReturned; i++ ) {
+ Row row = rs.one();
+ TransferLog tlog = new TransferLog(
+ row.getString( COLUMN_QUEUE_NAME ),
+ row.getString( COLUMN_SOURCE_REGION ),
+ row.getString( COLUMN_DEST_REGION ),
+ row.getUUID( COLUMN_MESSAGE_ID ),
+ row.getLong( COLUMN_TRANSFER_TIME ));
+ transferLogs.add( tlog );
+ }
+
+ return new Result<TransferLog>() {
+
+ @Override
+ public PagingState getPagingState() {
+ return newPagingState;
+ }
+
+ @Override
+ public List<TransferLog> getEntities() {
+ return transferLogs;
+ }
+ };
+ }
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.singletonList( new TableDefinitionStringImpl( TABLE_TRANSFER_LOG, CQL ) );
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index 6d62da0..7bd0fa7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -18,18 +18,18 @@
package org.apache.usergrid.persistence.queue.guice;
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.usergrid.persistence.qakka.QakkaModule;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory;
+import org.apache.usergrid.persistence.queue.impl.QakkaQueueManager;
import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
-import org.apache.usergrid.persistence.queue.LegacyQueueFig;
-import org.apache.usergrid.persistence.queue.LegacyQueueManager;
-import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-
/**
* Simple module for wiring our collection api
@@ -44,11 +44,11 @@ public class QueueModule extends AbstractModule {
install(new GuicyFigModule(LegacyQueueFig.class));
+ install( new QakkaModule() );
+
bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
- install(new FactoryModuleBuilder().implement(LegacyQueueManager.class, SNSQueueManagerImpl.class)
+ install( new FactoryModuleBuilder().implement(LegacyQueueManager.class, QakkaQueueManager.class)
.build(LegacyQueueManagerInternalFactory.class));
}
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
new file mode 100644
index 0000000..c407a78
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.*;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+
+public class QakkaQueueManager implements LegacyQueueManager {
+ private static final Logger logger = LoggerFactory.getLogger( QakkaQueueManager.class );
+
+ private final LegacyQueueScope scope;
+ private final LegacyQueueFig fig;
+ private final QueueManager queueManager;
+ private final QueueMessageManager queueMessageManager;
+ private final QakkaFig qakkaFig;
+ private final Regions regions;
+
+
+ @Inject
+ public QakkaQueueManager(
+ @Assisted LegacyQueueScope scope,
+ LegacyQueueFig fig,
+ QueueManager queueManager,
+ QueueMessageManager queueMessageManager,
+ QakkaFig qakkaFig,
+ Regions regions
+ ) {
+
+ this.scope = scope;
+ this.fig = fig;
+ this.queueManager = queueManager;
+ this.qakkaFig = qakkaFig;
+ this.queueMessageManager = queueMessageManager;
+ this.regions = regions;
+
+ if ( queueManager.getQueueConfig(scope.getName()) == null ) {
+
+ // TODO: read defaults from config
+ //queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
+
+ Queue queue = new Queue( scope.getName() );
+ queueManager.createQueue( queue );
+ }
+ }
+
+
+ @Override
+ public <T extends Serializable> void sendMessage(T body) throws IOException {
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(body);
+ oos.flush();
+ oos.close();
+ ByteBuffer byteBuffer = ByteBuffer.wrap( bos.toByteArray() );
+
+ queueMessageManager.sendMessages(
+ scope.getName(),
+ regions.getRegions( scope.getRegionImplementation().name() ),
+ null, // delay millis
+ null, // expiration seconds
+ "application/octet-stream",
+ DataType.serializeValue( byteBuffer, ProtocolVersion.NEWEST_SUPPORTED ));
+ }
+
+
+ @Override
+ public <T extends Serializable> void sendMessageToTopic(T body) throws IOException {
+ sendMessage( body );
+ }
+
+
+ @Override
+ public List<LegacyQueueMessage> getMessages(int limit, Class klass) {
+
+ List<LegacyQueueMessage> messages = new ArrayList<>();
+ List<QueueMessage> qakkaMessages = queueMessageManager.getNextMessages( scope.getName(), limit );
+
+ for ( QueueMessage qakkaMessage : qakkaMessages ) {
+
+ Object body;
+ try {
+ ByteBuffer messageData = queueMessageManager.getMessageData( qakkaMessage.getMessageId() );
+ ByteBuffer bb = (ByteBuffer)DataType.blob().deserialize(
+ messageData, ProtocolVersion.NEWEST_SUPPORTED );
+
+ ByteArrayInputStream bais = new ByteArrayInputStream( bb.array() );
+ ObjectInputStream ios = new ObjectInputStream( bais );
+ body = ios.readObject();
+
+ } catch (Throwable t) {
+ throw new QakkaRuntimeException( "Error de-serializing object", t );
+ }
+
+ LegacyQueueMessage legacyQueueMessage = new LegacyQueueMessage(
+ qakkaMessage.getQueueMessageId().toString(),
+ null, // handle
+ body,
+ null); // type
+
+ messages.add( legacyQueueMessage );
+ }
+
+ return messages;
+ }
+
+
+ @Override
+ public long getQueueDepth() {
+ return 0;
+ }
+
+
+ @Override
+ public void commitMessage(LegacyQueueMessage queueMessage) {
+
+ UUID queueMessageId = UUID.fromString( queueMessage.getMessageId() );
+ queueMessageManager.ackMessage( scope.getName(), queueMessageId );
+ }
+
+
+ @Override
+ public void commitMessages(List<LegacyQueueMessage> queueMessages) {
+
+ for ( LegacyQueueMessage message : queueMessages ) {
+ commitMessage( message );
+ }
+ }
+
+
+ @Override
+ public void sendMessages( List bodies ) throws IOException {
+
+ for ( Object body : bodies ) {
+ sendMessage( (Serializable)body );
+ }
+
+ }
+
+
+ @Override
+ public void deleteQueue() {
+ queueManager.deleteQueue( scope.getName() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index 2d51903..c1bdc72 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -67,7 +67,9 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory {
});
@Inject
- public QueueManagerFactoryImpl(final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory){
+ public QueueManagerFactoryImpl(
+ final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory) {
+
this.queueFig = queueFig;
this.queuemanagerInternalFactory = queuemanagerInternalFactory;
this.defaultManager = new HashMap<>(10);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml b/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..cc94f07
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+ version="3.0">
+
+ <display-name>qakka</display-name>
+
+ <listener>
+ <listener-class>org.apache.usergrid.persistence.qakka.api.impl.StartupListener</listener-class>
+ </listener>
+
+ <filter>
+ <filter-name>qakka</filter-name>
+ <filter-class>org.glassfish.jersey.servlet.ServletContainer</filter-class>
+
+ <init-param>
+ <param-name>javax.ws.rs.Application</param-name>
+ <param-value>org.apache.usergrid.persistence.qakka.api.impl.JerseyResourceConfig</param-value>
+ </init-param>
+
+ <init-param>
+ <param-name>jersey.config.server.tracing.type</param-name>
+ <!-- allowed values are OFF, ON_DEMAND, ALL -->
+ <param-value>OFF</param-value>
+ </init-param>
+
+ </filter>
+
+ <filter-mapping>
+ <filter-name>qakka</filter-name>
+ <url-pattern>/api/*</url-pattern>
+ </filter-mapping>
+
+</web-app>
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
new file mode 100644
index 0000000..8f5284c
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/AbstractTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class AbstractTest {
+ private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
+
+ static AtomicInteger nextPort = new AtomicInteger(3551);
+
+ protected static Injector sharedInjector;
+
+ static { new KeyspaceDropper(); }
+
+
+ public AbstractTest() {
+ if ( getInjector() == null ) {
+ setInjector( Guice.createInjector( new QueueModule() ) );
+ MigrationManager migrationManager = getInjector().getInstance( MigrationManager.class );
+ try {
+ migrationManager.migrate();
+ } catch (MigrationException e) {
+ logger.error("Error in migration", e);
+ }
+ }
+ }
+
+ protected Injector getInjector() {
+ return sharedInjector;
+ }
+
+ protected static void setInjector(Injector injector) {
+ AbstractTest.sharedInjector = injector;
+ }
+
+
+ protected int getNextAkkaPort() {
+ int ret = nextPort.getAndIncrement();
+ logger.info("Returning port {} for this {}", ret, this.hashCode());
+ return ret;
+ }
+
+
+ @BeforeClass
+ public static void startCassandra() throws Exception {
+ //EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml");
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
new file mode 100644
index 0000000..aa4dfd1
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 9/9/16.
+ */
+public class KeyspaceDropper {
+
+ private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
+
+ static { dropTestKeyspace(); }
+
+
+ public static void dropTestKeyspace() {
+
+ String propsFileName = "qakka.properties";
+
+ Properties props = new Properties();
+ try {
+ props.load( App.class.getResourceAsStream( "/" + propsFileName ) );
+ } catch (IOException e) {
+ throw new RuntimeException( "Unable to load " + propsFileName + " file!" );
+ }
+
+ String keyspace = (String)props.get("cassandra.keyspace.application");
+ String hosts[] = props.getProperty( "cassandra.hosts", "127.0.0.1" ).split(",");
+ int port = Integer.parseInt( props.getProperty( "cassandra.port", "9042" ));
+
+ Cluster.Builder builder = Cluster.builder();
+ for ( String host : hosts ) {
+ builder = builder.addContactPoint( host ).withPort( port );
+ }
+
+ final QueryOptions queryOptions = new QueryOptions().setConsistencyLevel( ConsistencyLevel.LOCAL_QUORUM );
+ builder.withQueryOptions( queryOptions );
+ Cluster cluster = builder.build();
+
+ Session session = cluster.connect();
+ logger.info("Dropping test keyspace: {}", keyspace);
+ session.execute( "DROP KEYSPACE IF EXISTS " + keyspace );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java
new file mode 100644
index 0000000..a7d6215
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/AbstractRestTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+
+import org.apache.usergrid.persistence.qakka.KeyspaceDropper;
+import org.apache.usergrid.persistence.qakka.api.impl.StartupListener;
+import org.apache.usergrid.persistence.qakka.api.impl.JerseyResourceConfig;
+import org.glassfish.jersey.test.DeploymentContext;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.ServletDeploymentContext;
+import org.junit.BeforeClass;
+
+import javax.ws.rs.core.Application;
+
+
+abstract public class AbstractRestTest extends JerseyTest {
+
+ static Application app;
+
+ static DeploymentContext context = null;
+
+ static { new KeyspaceDropper(); }
+
+
+ @BeforeClass
+ public static void startCassandra() throws Exception {
+ //EmbeddedCassandraServerHelper.startEmbeddedCassandra("/cassandra.yaml");
+ }
+
+ @Override
+ protected Application configure() {
+ if ( app == null ) {
+ app = new JerseyResourceConfig();
+ }
+ return app;
+ }
+
+ @Override
+ protected DeploymentContext configureDeployment() {
+ if ( context == null ) {
+ context = ServletDeploymentContext.builder( configure() ) .addListener( StartupListener.class ).build();
+ }
+ return context;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java
new file mode 100644
index 0000000..b99be69
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/PerformanceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URISyntaxException;
+import java.util.*;
+
+
+public class PerformanceTest {
+ private static final Logger logger = LoggerFactory.getLogger( PerformanceTest.class );
+
+
+ @Test
+ @Ignore("needs exernal Tomcat an Cassandra")
+ public void testSendAndGetMessagePerformance() throws URISyntaxException, JsonProcessingException {
+
+ Client client = ClientBuilder.newClient();
+
+ WebTarget target = client.target("http://macsnoopdave2013:8080/api/");
+
+ // create a queue
+
+ String queueName = "pt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
+ target.path("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+ // send some messages
+ int numMessages = 20000;
+
+ {
+ ObjectMapper mapper = new ObjectMapper();
+ List<Long> times = new ArrayList<>( numMessages );
+ int errorCount = 0;
+ int counter = 0;
+
+ for (int i = 0; i < numMessages; i++) {
+
+ final int number = i;
+ Map<String, Object> messageMap = new HashMap<String, Object>() {{
+ put( "message", "this is message #" + number );
+ put( "valid", true );
+ }};
+ String body = mapper.writeValueAsString( messageMap );
+
+ long startTime = System.currentTimeMillis();
+ Response post = target.path( "queues" ).path( queueName ).path( "messages" )
+ .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE ) );
+ long stopTime = System.currentTimeMillis();
+ times.add( stopTime - startTime );
+
+ if ( post.getStatus() != 200 ) {
+ errorCount++;
+ }
+
+ if ( ++counter % 500 == 0 ) {
+ logger.debug("Sent {} messages with error count {}", counter, errorCount);
+ }
+
+ try { Thread.sleep(5); } catch ( Exception intentionallyIgnored ) {};
+ }
+
+ Long total = times.stream().mapToLong( time -> time ).sum();
+ Long max = times.stream().max( Comparator.comparing( time -> time ) ).get();
+ Long min = times.stream().min( Comparator.comparing( time -> time ) ).get();
+ Double average = times.stream().mapToLong( time -> time ).average().getAsDouble();
+
+ logger.debug( "\n>>>>>>> Total send time {}ms, min {}ms, max {}ms, average {}ms errors {}\n\n",
+ total, min, max, average, errorCount );
+ }
+
+ // get all messages, checking for dups
+
+ {
+ Set<UUID> messageIds = new HashSet<>();
+ List<Long> times = new ArrayList<>( numMessages );
+ int errorCount = 0;
+ int counter = 0;
+
+ for (int j = 0; j < numMessages; j++) {
+
+ long startTime = System.currentTimeMillis();
+ Response response = target.path( "queues" ).path( queueName ).path( "messages" ).request().get();
+ long stopTime = System.currentTimeMillis();
+ times.add( stopTime - startTime );
+
+ if ( ++counter % 500 == 0 ) {
+ logger.debug("Got {} messages with error count {}", counter, errorCount);
+ }
+
+ if ( response .getStatus() != 200 ) {
+ errorCount++;
+ continue;
+ }
+
+ ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+ QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next();
+
+ if (messageIds.contains( queueMessage.getQueueMessageId() )) {
+ Assert.fail( "Message fetched twice: " + queueMessage.getQueueMessageId() );
+ } else {
+ messageIds.add( queueMessage.getQueueMessageId() );
+ }
+ }
+ Assert.assertEquals( numMessages, messageIds.size() );
+
+ Long total = times.stream().mapToLong( time -> time ).sum();
+ Long max = times.stream().max( Comparator.comparing( time -> time ) ).get();
+ Long min = times.stream().min( Comparator.comparing( time -> time ) ).get();
+ Double average = times.stream().mapToLong( time -> time ).average().getAsDouble();
+
+ logger.debug( "\n>>>>>>> Total get time {}ms, min {}ms, max {}ms, average {}ms errors {}\n\n",
+ total, min, max, average, errorCount );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
new file mode 100644
index 0000000..fcb4212
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.api;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.ByteStreams;
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.api.impl.StartupListener;
+import org.apache.usergrid.persistence.qakka.core.QueueMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.*;
+
+import static org.junit.Assert.fail;
+
+
+public class QueueResourceTest extends AbstractRestTest {
+ private static final Logger logger = LoggerFactory.getLogger( QueueResourceTest.class );
+
+ static private final TypeReference<Map<String,Object>> jsonMapTypeRef
+ = new TypeReference<Map<String,Object>>() {};
+
+ @Test
+ public void testCreateQueue() throws URISyntaxException {
+
+ // create a queue
+
+ String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ Map<String, Object> queueMap = new HashMap<String, Object>() {{
+ put("name", queueName);
+ }};
+ Response response = target("queues").request()
+ .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+ Assert.assertEquals( 201, response.getStatus() );
+ URIStrategy uriStrategy = StartupListener.INJECTOR.getInstance( URIStrategy.class );
+ Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) );
+
+ // get queue by name
+
+ response = target("queues").path( queueName ).path( "config" ).request().get();
+ Assert.assertEquals( 200, response.getStatus() );
+ ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+ Assert.assertNotNull( apiResponse.getQueues() );
+ Assert.assertFalse( apiResponse.getQueues().isEmpty() );
+ Assert.assertEquals( 1, apiResponse.getQueues().size() );
+ Assert.assertEquals( queueName, apiResponse.getQueues().iterator().next().getName() );
+ }
+
+
+ @Test
+ public void testDeleteQueue() throws URISyntaxException {
+
+ // create a queue
+
+ String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
+ Response response = target("queues").request()
+ .post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+ Assert.assertEquals( 201, response.getStatus() );
+ URIStrategy uriStrategy = StartupListener.INJECTOR.getInstance( URIStrategy.class );
+ Assert.assertEquals( uriStrategy.queueURI( queueName ).toString(), response.getHeaderString( "location" ) );
+
+ // delete queue without confirm = true, should fail with bad request
+
+ response = target("queues").path( queueName ).request().delete();
+ Assert.assertEquals( 400, response.getStatus() );
+
+ // delete queue with confirm = true
+
+ response = target("queues").path( queueName ).queryParam( "confirm", true ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
+
+ // cannot get queue by name
+
+ response = target("queues").path( queueName ).path( "config" ).request().get();
+ Assert.assertEquals( 404, response.getStatus() );
+ }
+
+
+ @Test
+ public void testSendMessageToBadQueue() throws URISyntaxException, JsonProcessingException, InterruptedException {
+
+ String queueName = "bogus_queue_is_bogus";
+ Map<String, Object> messageMap = new HashMap<String, Object>() {{ put("dummy_prop", "dummy_value"); }};
+ ObjectMapper mapper = new ObjectMapper();
+ String body = mapper.writeValueAsString( messageMap );
+
+ Response response = target("queues").path( queueName ).path( "messages" )
+ .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM_TYPE ));
+
+ Assert.assertEquals( 404, response.getStatus() );
+ }
+
+
+ @Test
+ public void testSendJsonMessagesAsJson() throws URISyntaxException, IOException, InterruptedException {
+ sendJsonMessages( true );
+ }
+
+
+ @Test
+ public void testSendMessagesJsonAsOctetStream() throws URISyntaxException, IOException, InterruptedException {
+ sendJsonMessages( false );
+ }
+
+
+ /**
+ * Send 100 JSON payload messages to queue.
+ * @param asJson True to send with content-type header 'application/json'
+ * False to send with content-type header 'application/octet stream'
+ */
+ private void sendJsonMessages( boolean asJson ) throws URISyntaxException, IOException, InterruptedException {
+
+ // create a queue
+
+ String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ Map<String, Object> queueMap = new HashMap<String, Object>() {{
+ put( "name", queueName );
+ }};
+ target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
+
+ // send some messages
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+
+ final int number = i;
+ Map<String, Object> messageMap = new HashMap<String, Object>() {{
+ put( "message", "this is message #" + number );
+ put( "valid", true );
+ }};
+ String body = mapper.writeValueAsString( messageMap );
+
+ Response response;
+ if ( asJson ) {
+ response = target( "queues" ).path( queueName ).path( "messages" )
+ .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) );
+ } else {
+ response = target( "queues" ).path( queueName ).path( "messages" )
+ .queryParam( "contentType", MediaType.APPLICATION_JSON )
+ .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) );
+ }
+
+ Assert.assertEquals( 200, response.getStatus() );
+ }
+
+ // get all messages, checking for dups
+
+ checkJsonMessages( queueName, numMessages );
+ }
+
+
+ private Set<UUID> checkJsonMessages( String queueName, int numMessages ) throws IOException {
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ Set<UUID> messageIds = new HashSet<>();
+ for ( int j=0; j<numMessages; j++ ) {
+
+ int retries = 0;
+ int maxRetries = 10;
+ ApiResponse apiResponse = null;
+ while ( retries++ < maxRetries ) {
+ Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+ apiResponse = response.readEntity( ApiResponse.class );
+ if ( !apiResponse.getQueueMessages().isEmpty() ) {
+ break;
+ }
+ try { Thread.sleep(500); } catch (Exception ignored) {}
+ }
+
+ Assert.assertNotNull( apiResponse );
+ Assert.assertNotNull( apiResponse.getQueueMessages() );
+ Assert.assertEquals( 1, apiResponse.getQueueMessages().size() );
+
+ QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next();
+ Map<String, Object> payload = mapper.readValue( queueMessage.getData(), jsonMapTypeRef );
+
+ Assert.assertEquals( queueName, queueMessage.getQueueName() );
+ Assert.assertNull( queueMessage.getHref() );
+ Assert.assertEquals( true, payload.get("valid") );
+
+ if (messageIds.contains( queueMessage.getQueueMessageId() )) {
+ Assert.fail("Message fetched twice: " + queueMessage.getQueueMessageId() );
+ } else {
+ messageIds.add( queueMessage.getQueueMessageId() );
+ }
+ }
+ Assert.assertEquals( numMessages, messageIds.size() );
+
+ return messageIds;
+ }
+
+
+ @Test
+ public void testSendBinaryMessages() throws URISyntaxException, IOException, InterruptedException {
+
+ // create a queue
+
+ String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ Map<String, Object> queueMap = new HashMap<String, Object>() {{
+ put( "name", queueName );
+ }};
+ target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
+
+ // send messages each with image/jpg payload
+
+ InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg");
+ byte[] bytes = ByteStreams.toByteArray( is );
+
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+
+ Response response = target( "queues" ).path( queueName ).path( "messages" )
+ .queryParam( "contentType", "image/jpg" )
+ .request()
+ .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ));
+
+ Assert.assertEquals( 200, response.getStatus() );
+ }
+
+ // get all messages, checking for dups
+
+ checkBinaryMessages( queueName, numMessages );
+ }
+
+
+ private Set<UUID> checkBinaryMessages( String queueName, int numMessages ) throws IOException {
+
+ Set<UUID> messageIds = new HashSet<>();
+ for ( int j=0; j<numMessages; j++ ) {
+
+ Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+
+ ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+ Assert.assertNotNull( apiResponse.getQueueMessages() );
+ Assert.assertFalse( apiResponse.getQueueMessages().isEmpty() );
+ Assert.assertEquals( 1, apiResponse.getQueueMessages().size() );
+
+ QueueMessage queueMessage = apiResponse.getQueueMessages().iterator().next();
+
+ // no data in a binary message
+ Assert.assertNull( queueMessage.getData() );
+
+ // data can be found at HREF provided
+ Assert.assertNotNull( queueMessage.getHref() );
+
+ Response binaryResponse = target("queues")
+ .path( queueName ).path("data").path( queueMessage.getQueueMessageId().toString() )
+ .request().accept( "image/jpg" ).get();
+
+ Assert.assertEquals( 200, binaryResponse.getStatus() );
+ InputStream is = binaryResponse.readEntity( InputStream.class );
+
+ byte[] imageBytes = ByteStreams.toByteArray( is );
+ Assert.assertEquals( 11188, imageBytes.length);
+
+ if (messageIds.contains( queueMessage.getQueueMessageId() )) {
+ fail("Message fetched twice: " + queueMessage.getQueueMessageId() );
+ } else {
+ messageIds.add( queueMessage.getQueueMessageId() );
+ }
+ }
+ Assert.assertEquals( numMessages, messageIds.size() );
+
+ return messageIds;
+ }
+
+
+ @Test
+ public void testSendMessageAckAndTimeout() throws URISyntaxException, IOException, InterruptedException {
+
+ // create a queue
+
+ String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+ Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
+ target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
+
+ // send some messages
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ int numMessages = 100;
+ for ( int i=0; i<numMessages; i++ ) {
+
+ final int number = i;
+ Map<String, Object> messageMap = new HashMap<String, Object>() {{
+ put("message", "this is message #" + number);
+ put("valid", true );
+ }};
+ String body = mapper.writeValueAsString( messageMap );
+
+ Response response = target("queues").path( queueName ).path( "messages" )
+ .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ));
+
+ Assert.assertEquals( 200, response.getStatus() );
+ }
+
+ // get all messages, checking for dups
+
+ Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
+
+ // there should be no more messages available
+
+ Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+ ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+ Assert.assertNotNull( apiResponse.getQueueMessages() );
+ Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
+
+ // ack half of the messages
+
+ int count = 0;
+ Set<UUID> ackedIds = new HashSet<>();
+ for ( UUID queueMessageId : messageIds ) {
+ response = target( "queues" )
+ .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
+ Assert.assertEquals( 200, response.getStatus() );
+ ackedIds.add( queueMessageId );
+ if ( ++count >= numMessages/2 ) {
+ break;
+ }
+ }
+ messageIds.removeAll( ackedIds );
+
+ // wait for remaining of the messages to timeout
+
+ QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class );
+ Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 );
+
+ // now, the remaining messages cannot be acked because they timed out
+
+ for ( UUID queueMessageId : messageIds ) {
+ response = target( "queues" )
+ .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
+ Assert.assertEquals( 400, response.getStatus() );
+ }
+
+ // and, those same messages should be available again in the queue
+
+ checkJsonMessages( queueName, numMessages/2 );
+ }
+
+
+ @Test
+ public void testConvertDelayParameter() {
+
+ Injector injector = StartupListener.INJECTOR;
+ QueueResource queueResource = injector.getInstance( QueueResource.class );
+
+ Assert.assertEquals( 0L, queueResource.convertDelayParameter( "" ).longValue() );
+ Assert.assertEquals( 0L, queueResource.convertDelayParameter( "0" ).longValue() );
+ Assert.assertEquals( 0L, queueResource.convertDelayParameter( "NONE" ).longValue() );
+ Assert.assertEquals( 5L, queueResource.convertDelayParameter( "5" ).longValue() );
+
+ try {
+ queueResource.convertDelayParameter( "bogus value" );
+ fail("Expected exception on bad value");
+ } catch ( IllegalArgumentException expected ) {
+ // pass
+ }
+ }
+
+ @Test
+ public void testConvertExpirationParameter() {
+
+ Injector injector = StartupListener.INJECTOR;
+ QueueResource queueResource = injector.getInstance( QueueResource.class );
+
+ Assert.assertNull( queueResource.convertExpirationParameter( "" ) );
+ Assert.assertNull( queueResource.convertExpirationParameter( "NEVER" ) );
+
+ Assert.assertEquals( 5L, queueResource.convertExpirationParameter( "5" ).longValue() );
+
+ try {
+ queueResource.convertExpirationParameter( "bogus value" );
+ fail("Expected exception on bad value");
+ } catch ( IllegalArgumentException expected ) {
+ // pass
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
new file mode 100644
index 0000000..42423fa
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.common;
+
+import com.datastax.driver.core.Session;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class CassandraClientTest extends AbstractTest {
+
+ @Test
+ public void getClient(){
+
+ CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+
+ Session session = cassandraClient.getSession();
+
+ session.getLoggedKeyspace();
+
+ }
+
+
+}