You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:39 UTC
[10/25] usergrid git commit: Initial integration of Qakka into
Usergrid Queue module,
and implementation of Qakka-based LegacyQueueManager implementation.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java
new file mode 100644
index 0000000..84e0681
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class QakkaException extends Exception {
+
+ public QakkaException(String message) {
+ super( message );
+ }
+
+ public QakkaException(String message, Throwable cause) {
+ super( message, cause );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java
new file mode 100644
index 0000000..fbb788b
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/exceptions/QakkaRuntimeException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.exceptions;
+
+
+public class QakkaRuntimeException extends RuntimeException {
+
+ public QakkaRuntimeException(String message) {
+ super( message );
+ }
+
+ public QakkaRuntimeException(String message, Throwable cause) {
+ super( message, cause );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
new file mode 100644
index 0000000..1c733a6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl.*;
+
+
+public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage> {
+
+ private static final Logger logger = LoggerFactory.getLogger( MultiShardMessageIterator.class );
+
+ private final CassandraClient cassandraClient;
+
+
+ private final int PAGE_SIZE = 100;
+ private final String queueName;
+ private final String region;
+ private final DatabaseQueueMessage.Type messageType;
+ private final Iterator<Shard> shardIterator;
+
+ private Iterator<DatabaseQueueMessage> currentIterator;
+ private Shard currentShard;
+ private UUID nextStart;
+
+
+ public MultiShardMessageIterator(
+ final CassandraClient cassandraClient,
+ final String queueName,
+ final String region,
+ final DatabaseQueueMessage.Type messageType,
+ final Iterator<Shard> shardIterator,
+ final UUID nextStart) {
+
+ this.queueName = queueName;
+ this.region = region;
+ this.messageType = messageType;
+ this.shardIterator = shardIterator;
+ this.nextStart = nextStart;
+ this.cassandraClient = cassandraClient;
+
+ if (shardIterator == null) {
+ throw new RuntimeException("shardIterator cannot be null");
+ }
+
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if ( shardIterator.hasNext() && currentIterator == null) {
+ advance();
+ }
+
+ if ( shardIterator.hasNext() && !currentIterator.hasNext()) {
+ advance();
+ }
+
+ if ( !shardIterator.hasNext() && ( currentIterator == null || !currentIterator.hasNext()) ) {
+ advance();
+ }
+
+ return currentIterator.hasNext();
+ }
+
+ @Override
+ public DatabaseQueueMessage next() {
+
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No next message exists" );
+ }
+
+ return currentIterator.next();
+ }
+
+ private void advance(){
+
+ if (currentShard == null){
+ currentShard = shardIterator.next();
+ }
+
+ Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName);
+ Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region);
+ Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, currentShard.getShardId());
+
+ // if we have a pointer from the shard and this is the first seek, init from the pointer's position
+ if ( currentShard.getPointer() != null && nextStart == null ){
+ nextStart = currentShard.getPointer();
+ }
+
+ Statement query;
+
+ if ( nextStart == null) {
+
+ query = QueryBuilder.select().all().from(QueueMessageSerializationImpl.getTableName(messageType))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(shardIdClause)
+ .limit(PAGE_SIZE);
+ } else {
+
+ Clause messageIdClause = QueryBuilder.gt( COLUMN_QUEUE_MESSAGE_ID, nextStart);
+ query = QueryBuilder.select().all().from(QueueMessageSerializationImpl.getTableName(messageType))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(shardIdClause)
+ .and(messageIdClause)
+ .limit(PAGE_SIZE);
+ }
+
+ List<Row> rows = cassandraClient.getSession().execute(query).all();
+
+ if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) {
+
+ currentShard = shardIterator.next();
+ advance();
+
+ } else {
+
+ currentIterator = getIteratorFromRows(rows);
+
+ }
+ }
+
+
+ private Iterator<DatabaseQueueMessage> getIteratorFromRows(List<Row> rows){
+
+ List<DatabaseQueueMessage> messages = new ArrayList<>(rows.size());
+
+ rows.forEach(row -> {
+
+ final String queueName = row.getString( COLUMN_QUEUE_NAME);
+ final String region = row.getString( COLUMN_REGION);
+ final long shardId = row.getLong( COLUMN_SHARD_ID);
+ final UUID queueMessageId = row.getUUID( COLUMN_QUEUE_MESSAGE_ID);
+ final UUID messageId = row.getUUID( COLUMN_MESSAGE_ID);
+ final long queuedAt = row.getLong( COLUMN_QUEUED_AT);
+ final long inflightAt = row.getLong( COLUMN_INFLIGHT_AT);
+
+ messages.add(new DatabaseQueueMessage(
+ messageId, messageType, queueName, region, shardId, queuedAt, inflightAt, queueMessageId));
+
+ //queueMessageId is internal to the messages_available and messages_inflight tables
+ nextStart = queueMessageId;
+
+ });
+
+ return messages.iterator();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java
new file mode 100644
index 0000000..b2b0934
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/Result.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization;
+
+import com.datastax.driver.core.PagingState;
+
+import java.util.List;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 8/8/16.
+ */
+public interface Result<T> {
+ PagingState getPagingState();
+
+ List<T> getEntities();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java
new file mode 100644
index 0000000..dcf0d1a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLog.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlog;
+
+import java.util.UUID;
+
+
+public class AuditLog {
+
+ public enum Status { SUCCESS, ERROR };
+
+ public enum Action { SEND, ACK, GET, REQUEUE };
+
+ Action action;
+ Status status;
+ String queueName;
+ String region;
+ UUID messageId;
+
+ UUID queueMessageId;
+ long transfer_time;
+
+ public AuditLog(
+ Action action,
+ Status status,
+ String queueName,
+ String region,
+ UUID messageId,
+ UUID queueMessageId,
+ long transfer_time) {
+
+ this.action = action;
+ this.status = status;
+ this.queueName = queueName;
+ this.region = region;
+ this.messageId = messageId;
+ this.queueMessageId = queueMessageId;
+ this.transfer_time = transfer_time;
+ }
+
+ public Action getAction() {
+ return action;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(UUID messageId) {
+ this.messageId = messageId;
+ }
+
+ public long getTransfer_time() {
+ return transfer_time;
+ }
+
+ public void setTransfer_time(long transfer_time) {
+ this.transfer_time = transfer_time;
+ }
+
+ public UUID getQueueMessageId() {
+ return queueMessageId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java
new file mode 100644
index 0000000..95f2dbe
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/AuditLogSerialization.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlog;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+
+import java.util.UUID;
+
+
+public interface AuditLogSerialization extends Migration {
+
+ /**
+ * Record audit log record.
+ */
+ void recordAuditLog(
+ AuditLog.Action action,
+ AuditLog.Status status,
+ String queueName,
+ String region,
+ UUID messageId,
+ UUID queueMessageId);
+
+ /**
+ * Get all audit logs for a specific queue message.
+ */
+ Result<AuditLog> getAuditLogs(UUID messageId);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
new file mode 100644
index 0000000..d9dbab6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.auditlog.impl;
+
+import com.datastax.driver.core.PagingState;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.Result;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+public class AuditLogSerializationImpl implements AuditLogSerialization {
+
+ private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ public final static String TABLE_AUDIT_LOG = "audit_log";
+
+ public final static String COLUMN_ACTION = "action";
+ public final static String COLUMN_STATUS = "status";
+ public final static String COLUMN_QUEUE_NAME = "queue_name";
+ public final static String COLUMN_REGION = "region";
+ public final static String COLUMN_MESSAGE_ID = "message_id";
+ public final static String COLUMN_QUEUE_MESSAGE_ID = "queue_message_id";
+ public final static String COLUMN_TRANSFER_TIME = "transfer_time";
+
+
+ // design note: want to be able to query this by message_id, so we can do "garbage collection"
+ // of message data items that have been processed in all target regions
+
+ static final String CQL =
+ "CREATE TABLE IF NOT EXISTS audit_log ( " +
+ "action text, " +
+ "status text, " +
+ "queue_name text, " +
+ "region text, " +
+ "message_id timeuuid, " +
+ "queue_message_id timeuuid, " +
+ "transfer_time bigint, " +
+ "PRIMARY KEY (message_id, transfer_time) " +
+ ") WITH CLUSTERING ORDER BY (transfer_time ASC); ";
+
+
+ @Inject
+ public AuditLogSerializationImpl( CassandraClient cassandraClient ) {
+ this.cassandraClient = cassandraClient;
+ }
+
+
+ @Override
+ public void recordAuditLog(
+ AuditLog.Action action,
+ AuditLog.Status status,
+ String queueName,
+ String region,
+ UUID messageId,
+ UUID queueMessageId ) {
+
+ Statement insert = QueryBuilder.insertInto(TABLE_AUDIT_LOG)
+ .value(COLUMN_ACTION, action.toString() )
+ .value(COLUMN_STATUS, status.toString() )
+ .value(COLUMN_QUEUE_NAME, queueName )
+ .value(COLUMN_REGION, region )
+ .value(COLUMN_MESSAGE_ID, messageId )
+ .value(COLUMN_QUEUE_MESSAGE_ID, queueMessageId )
+ .value(COLUMN_TRANSFER_TIME, System.currentTimeMillis() );
+ cassandraClient.getSession().execute(insert);
+ }
+
+
+ @Override
+ public Result<AuditLog> getAuditLogs( UUID messageId ) {
+
+ Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG)
+ .where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) );
+
+ ResultSet rs = cassandraClient.getSession().execute( query );
+
+ final List<AuditLog> auditLogs = rs.all().stream().map( row ->
+ new AuditLog(
+ AuditLog.Action.valueOf( row.getString( COLUMN_ACTION )),
+ AuditLog.Status.valueOf( row.getString( COLUMN_STATUS )),
+ row.getString( COLUMN_QUEUE_NAME ),
+ row.getString( COLUMN_REGION ),
+ row.getUUID( COLUMN_MESSAGE_ID ),
+ row.getUUID( COLUMN_QUEUE_MESSAGE_ID ),
+ row.getLong( COLUMN_TRANSFER_TIME ) )
+ ).collect( Collectors.toList() );
+
+ return new Result<AuditLog>() {
+
+ @Override
+ public PagingState getPagingState() {
+ return null; // no paging
+ }
+
+ @Override
+ public List<AuditLog> getEntities() {
+ return auditLogs;
+ }
+ };
+
+ }
+
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.singletonList( new TableDefinitionStringImpl( TABLE_AUDIT_LOG, CQL ) );
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
new file mode 100644
index 0000000..dab47d5
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import java.util.UUID;
+
+
+public class DatabaseQueueMessage {
+
+ public enum Type {
+ DEFAULT, INFLIGHT
+ }
+
+ private final String queueName;
+ private final String region;
+ private final Long queuedAt;
+ private final UUID messageId;
+ private final UUID queueMessageId;
+
+ private Type type;
+ private Long inflightAt;
+
+ private Long shardId;
+
+
+ public DatabaseQueueMessage(
+ final UUID messageId,
+ final Type type,
+ final String queueName,
+ final String region,
+ final Long shardId,
+ final Long queuedAt,
+ final Long inflightAt,
+ UUID queueMessageId){
+
+ this.messageId = messageId;
+ this.type = type;
+ this.queueName = queueName;
+ this.region = region;
+ this.shardId = shardId;
+ this.queuedAt = queuedAt;
+ this.inflightAt = inflightAt;
+ this.queueMessageId = queueMessageId;
+
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public Long getShardId() {
+ return shardId;
+ }
+
+ public UUID getMessageId() {
+ return messageId;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public Long getQueuedAt() {
+ return queuedAt;
+ }
+
+ public UUID getQueueMessageId() {
+ return queueMessageId;
+ }
+
+ public Long getInflightAt() {
+ return inflightAt;
+ }
+
+ public void setInflightAt(Long inflightAt) {
+ this.inflightAt = inflightAt;
+ }
+
+ public void setShardId(Long shardId) {
+ this.shardId = shardId;
+ }
+
+
+
+ @Override
+ public int hashCode() {
+ int result = queueName.hashCode();
+ result = ( 31 * result ) + region.hashCode();
+ result = ( 31 * result ) + (int)( shardId != null ? shardId : 0L );
+ result = ( 31 * result ) + messageId.hashCode();
+ result = ( 31 * result ) + type.hashCode();
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if( this == obj){
+ return true;
+ }
+
+ if( !(obj instanceof DatabaseQueueMessage)){
+ return false;
+ }
+
+ DatabaseQueueMessage that = (DatabaseQueueMessage) obj;
+
+ if( !this.queueName.equalsIgnoreCase(that.queueName)){
+ return false;
+ }
+ if( !this.region.equalsIgnoreCase(that.region)){
+ return false;
+ }
+ if( this.shardId != that.shardId){
+ return false;
+ }
+ if( !messageId.equals(that.messageId)){
+ return false;
+ }
+ if( !type.equals(that.type)){
+ return false;
+ }
+
+ return true;
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java
new file mode 100644
index 0000000..c4c7fce
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageBody.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+
+
+public class DatabaseQueueMessageBody {
+
+
+ private final ByteBuffer blob;
+ private final String contentType;
+
+
+ public DatabaseQueueMessageBody(final ByteBuffer blob, final String contentType){
+
+ Preconditions.checkNotNull(blob, "Blob data cannot be null");
+
+ this.blob = blob;
+ this.contentType = contentType;
+
+ }
+
+ public ByteBuffer getBlob() {
+ return blob;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
new file mode 100644
index 0000000..cbbf11f
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+
+public interface MessageCounterSerialization extends Migration {
+
+ void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+
+ void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+
+ long getCounterValue(String name, DatabaseQueueMessage.Type type);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
new file mode 100644
index 0000000..3ebe735
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+import java.util.UUID;
+
+
+public interface QueueMessageSerialization extends Migration {
+
+ /**
+ * Write message to storage..
+ * If queueMessageId or createdTime are null, then values will be generated.
+ */
+ UUID writeMessage(final DatabaseQueueMessage message);
+
+ DatabaseQueueMessage loadMessage(
+ final String queueName,
+ final String region,
+ final Long shardIdOrNull,
+ final DatabaseQueueMessage.Type type,
+ final UUID queueMessageId);
+
+ void deleteMessage(
+ final String queueName,
+ final String region,
+ final Long shardIdOrNull,
+ final DatabaseQueueMessage.Type type,
+ final UUID queueMessageId);
+
+ void writeMessageData(final UUID messageId, final DatabaseQueueMessageBody messageBody);
+
+ DatabaseQueueMessageBody loadMessageData(final UUID messageId);
+
+ void deleteMessageData(final UUID messageId);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
new file mode 100644
index 0000000..65ffc47
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+@Singleton
+public class MessageCounterSerializationImpl implements ShardCounterSerialization {
+ private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ final static String TABLE_SHARD_COUNTERS = "counters";
+ final static String COLUMN_QUEUE_NAME = "queue_name";
+ final static String COLUMN_SHARD_ID = "shard_id";
+ final static String COLUMN_COUNTER_VALUE = "counter_value";
+ final static String COLUMN_SHARD_TYPE = "shard_type";
+
+ // design note: counters based on DataStax example here:
+ // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
+
+ static final String CQL =
+ "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+ "counter_value counter, " +
+ "queue_name varchar, " +
+ "shard_type varchar, " +
+ "shard_id bigint, " +
+ "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+ ");";
+
+
+ final long maxInMemoryIncrement;
+
+ class InMemoryCount {
+ long baseCount;
+ final AtomicLong increment = new AtomicLong( 0L );
+ InMemoryCount( long baseCount ) {
+ this.baseCount = baseCount;
+ }
+ public long value() {
+ return baseCount + increment.get();
+ }
+ public AtomicLong getIncrement() {
+ return increment;
+ }
+ void setBaseCount( long baseCount ) {
+ this.baseCount = baseCount;
+ }
+ }
+
+ private Map<String, InMemoryCount> inMemoryCounters = new ConcurrentHashMap<>(200);
+
+
+ @Inject
+ public MessageCounterSerializationImpl(QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+ this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+ this.cassandraClient = cassandraClient;
+ }
+
+
+ @Override
+ public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+
+ String key = queueName + type + shardId;
+ synchronized ( inMemoryCounters ) {
+
+ if ( inMemoryCounters.get( key ) == null ) {
+
+ Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+ if ( value == null ) {
+ incrementCounterInStorage( queueName, type, shardId, 0L );
+ inMemoryCounters.put( key, new InMemoryCount( 0L ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
+ }
+ inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
+ return;
+ }
+ }
+
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+ synchronized ( inMemoryCount ) {
+ long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+
+ if (totalIncrement > maxInMemoryIncrement) {
+ incrementCounterInStorage( queueName, type, shardId, totalIncrement );
+ inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
+ inMemoryCount.getIncrement().set( 0L );
+ }
+ }
+
+ }
+
+
+ @Override
+ public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+
+ String key = queueName + type + shardId;
+
+ synchronized ( inMemoryCounters ) {
+
+ if ( inMemoryCounters.get( key ) == null ) {
+
+ Long value = retrieveCounterFromStorage( queueName, type, shardId );
+
+ if ( value == null ) {
+ throw new NotFoundException(
+ MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
+ queueName, type, shardId ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
+ }
+ }
+ }
+
+ return inMemoryCounters.get( key ).value();
+ }
+
+ void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
+
+ Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS )
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString() ) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) )
+ .with( QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
+ cassandraClient.getSession().execute( update );
+ }
+
+
+ Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+
+ Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS )
+ .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
+ .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+
+ ResultSet resultSet = cassandraClient.getSession().execute( query );
+ List<Row> all = resultSet.all();
+
+ if ( all.size() > 1 ) {
+ throw new QakkaRuntimeException(
+ "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+ }
+ if ( all.isEmpty() ) {
+ return null;
+ }
+ return all.get(0).getLong( COLUMN_COUNTER_VALUE );
+ }
+
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.singletonList( new TableDefinitionStringImpl( TABLE_SHARD_COUNTERS, CQL ) );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
new file mode 100644
index 0000000..f55b936
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+
+public class QueueMessageSerializationImpl implements QueueMessageSerialization {
+
+ private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ private final ActorSystemFig actorSystemFig;
+ private final ShardStrategy shardStrategy;
+ private final ShardCounterSerialization shardCounterSerialization;
+
+ public final static String COLUMN_QUEUE_NAME = "queue_name";
+ public final static String COLUMN_REGION = "region";
+ public final static String COLUMN_SHARD_ID = "shard_id";
+ public final static String COLUMN_QUEUED_AT = "queued_at";
+ public final static String COLUMN_INFLIGHT_AT = "inflight_at";
+ public final static String COLUMN_QUEUE_MESSAGE_ID = "queue_message_id";
+ public final static String COLUMN_MESSAGE_ID = "message_id";
+ public final static String COLUMN_CONTENT_TYPE = "content_type";
+ public final static String COLUMN_MESSAGE_DATA = "data";
+
+ public final static String TABLE_MESSAGES_AVAILABLE = "messages_available";
+
+ public final static String TABLE_MESSAGES_INFLIGHT = "messages_inflight";
+
+ public final static String TABLE_MESSAGE_DATA = "message_data";
+
+ static final String MESSAGES_AVAILABLE =
+ "CREATE TABLE IF NOT EXISTS messages_available ( " +
+ "queue_name text, " +
+ "region text, " +
+ "shard_id bigint, " +
+ "queue_message_id timeuuid, " +
+ "message_id uuid, " +
+ "queued_at bigint, " +
+ "inflight_at bigint, " +
+ "PRIMARY KEY ((queue_name, region, shard_id), queue_message_id ) " +
+ ") WITH CLUSTERING ORDER BY (queue_message_id ASC); ";
+
+ static final String MESSAGES_INFLIGHT =
+ "CREATE TABLE IF NOT EXISTS messages_inflight ( " +
+ "queue_name text, " +
+ "region text, " +
+ "shard_id bigint, " +
+ "queue_message_id timeuuid, " +
+ "message_id uuid, " +
+ "queued_at bigint, " +
+ "inflight_at bigint, " +
+ "PRIMARY KEY ((queue_name, region, shard_id), queue_message_id ) " +
+ ") WITH CLUSTERING ORDER BY (queue_message_id ASC); ";
+
+ static final String MESSAGE_DATA =
+ "CREATE TABLE IF NOT EXISTS message_data ( " +
+ "message_id uuid, " +
+ "data blob, " +
+ "content_type text, " +
+ "PRIMARY KEY ((message_id)) " +
+ "); ";
+
+ @Inject
+ public QueueMessageSerializationImpl(
+ ActorSystemFig actorSystemFig,
+ ShardStrategy shardStrategy,
+ ShardCounterSerialization shardCounterSerialization,
+ CassandraClient cassandraClient
+ ) {
+ this.actorSystemFig = actorSystemFig;
+ this.shardStrategy = shardStrategy;
+ this.shardCounterSerialization = shardCounterSerialization;
+ this.cassandraClient = cassandraClient;
+ }
+
+
+ @Override
+ public UUID writeMessage(final DatabaseQueueMessage message) {
+
+ final UUID queueMessageId = message.getQueueMessageId() == null ?
+ QakkaUtils.getTimeUuid() : message.getQueueMessageId();
+
+ long queuedAt = message.getQueuedAt() == null ?
+ System.currentTimeMillis() : message.getQueuedAt();
+
+ long inflightAt = message.getInflightAt() == null ?
+ message.getQueuedAt() : message.getInflightAt();
+
+ Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ?
+ Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+
+ if ( message.getShardId() == null ) {
+ Shard shard = shardStrategy.selectShard(
+ message.getQueueName(), actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+ message.setShardId( shard.getShardId() );
+ }
+
+ Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
+ .value( COLUMN_QUEUE_NAME, message.getQueueName())
+ .value( COLUMN_REGION, message.getRegion())
+ .value( COLUMN_SHARD_ID, message.getShardId())
+ .value( COLUMN_MESSAGE_ID, message.getMessageId())
+ .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
+ .value( COLUMN_INFLIGHT_AT, inflightAt )
+ .value( COLUMN_QUEUED_AT, queuedAt);
+
+ cassandraClient.getSession().execute(insert);
+
+ shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
+
+ return queueMessageId;
+ }
+
+
+ @Override
+ public DatabaseQueueMessage loadMessage(
+ final String queueName,
+ final String region,
+ final Long shardIdOrNull,
+ final DatabaseQueueMessage.Type type,
+ final UUID queueMessageId ) {
+
+ if ( queueMessageId == null ) {
+ return null;
+ }
+
+ final long shardId;
+ if ( shardIdOrNull == null ) {
+ Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+ Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+ Shard shard = shardStrategy.selectShard(
+ queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+ shardId = shard.getShardId();
+ } else {
+ shardId = shardIdOrNull;
+ }
+
+ Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName );
+ Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region );
+ Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId );
+ Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+ Statement select = QueryBuilder.select().from(getTableName( type ))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(shardIdClause)
+ .and(queueMessageIdClause);
+
+ Row row = cassandraClient.getSession().execute(select).one();
+
+ if (row == null) {
+ return null;
+ }
+
+ return new DatabaseQueueMessage(
+ row.getUUID( COLUMN_MESSAGE_ID),
+ type,
+ row.getString( COLUMN_QUEUE_NAME),
+ row.getString( COLUMN_REGION),
+ row.getLong( COLUMN_SHARD_ID),
+ row.getLong( COLUMN_QUEUED_AT),
+ row.getLong( COLUMN_INFLIGHT_AT),
+ row.getUUID( COLUMN_QUEUE_MESSAGE_ID)
+ );
+ }
+
+
+ @Override
+ public void deleteMessage(
+ final String queueName,
+ final String region,
+ final Long shardIdOrNull,
+ final DatabaseQueueMessage.Type type,
+ final UUID queueMessageId ) {
+
+ final long shardId;
+ if ( shardIdOrNull == null ) {
+ Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+ Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+ Shard shard = shardStrategy.selectShard(
+ queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+ shardId = shard.getShardId();
+ } else {
+ shardId = shardIdOrNull;
+ }
+
+ Clause queueNameClause = QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName );
+ Clause regionClause = QueryBuilder.eq( COLUMN_REGION, region );
+ Clause shardIdClause = QueryBuilder.eq( COLUMN_SHARD_ID, shardId );
+ Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+ Statement delete = QueryBuilder.delete().from(getTableName( type ))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(shardIdClause)
+ .and(queueMessageIdClause);
+
+ ResultSet resultSet = cassandraClient.getSession().execute( delete );
+
+ String s = "s";
+ }
+
+
+ @Override
+ public DatabaseQueueMessageBody loadMessageData(final UUID messageId ){
+
+ Clause messageIdClause = QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId );
+
+ Statement select = QueryBuilder.select().from( TABLE_MESSAGE_DATA).where(messageIdClause);
+
+ Row row = cassandraClient.getSession().execute(select).one();
+ if ( row == null ) {
+ return null;
+ }
+
+ return new DatabaseQueueMessageBody(
+ row.getBytes( COLUMN_MESSAGE_DATA),
+ row.getString( COLUMN_CONTENT_TYPE));
+ }
+
+
+ @Override
+ public void writeMessageData( final UUID messageId, final DatabaseQueueMessageBody messageBody ) {
+ Preconditions.checkArgument(QakkaUtils.isTimeUuid(messageId), "MessageId is not a type 1 UUID");
+
+ Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA)
+ .value( COLUMN_MESSAGE_ID, messageId)
+ .value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
+ .value( COLUMN_CONTENT_TYPE, messageBody.getContentType());
+
+ cassandraClient.getSession().execute(insert);
+ }
+
+
+ @Override
+ public void deleteMessageData( final UUID messageId ) {
+
+ Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId);
+
+ Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA)
+ .where(messageIdClause);
+
+ cassandraClient.getSession().execute(delete);
+ }
+
+
+ public static String getTableName(DatabaseQueueMessage.Type messageType){
+
+ String table;
+ if( messageType.equals(DatabaseQueueMessage.Type.DEFAULT)) {
+ table = TABLE_MESSAGES_AVAILABLE;
+ }else if (messageType.equals(DatabaseQueueMessage.Type.INFLIGHT)) {
+ table = TABLE_MESSAGES_INFLIGHT;
+ }else{
+ throw new IllegalArgumentException("Unknown DatabaseQueueMessage Type");
+ }
+
+ return table;
+ }
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Lists.newArrayList(
+ new TableDefinitionStringImpl( TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ),
+ new TableDefinitionStringImpl( TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ),
+ new TableDefinitionStringImpl( TABLE_MESSAGE_DATA, MESSAGE_DATA )
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java
new file mode 100644
index 0000000..dcf273a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/DatabaseQueue.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues;
+
+public class DatabaseQueue {
+
+ private final String name;
+ private final String regions;
+
+ private String defaultDestinations;
+ private Long defaultDelayMs;
+ private Integer retryCount;
+ private Integer handlingTimeoutSec;
+ private String deadLetterQueue;
+
+
+ public DatabaseQueue(final String name,
+ final String regions,
+ final String defaultDestinations,
+ final Long defaultDelayMs,
+ final Integer retryCount,
+ final Integer handlingTimeoutSec,
+ final String deadLetterQueue ){
+
+ this.name = name;
+ this.regions = regions;
+ this.defaultDestinations = defaultDestinations;
+ this.defaultDelayMs = defaultDelayMs;
+ this.retryCount = retryCount;
+ this.handlingTimeoutSec = handlingTimeoutSec;
+ this.deadLetterQueue = deadLetterQueue;
+
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDeadLetterQueue() {
+ return deadLetterQueue;
+ }
+
+ public Integer getHandlingTimeoutSec() {
+ return handlingTimeoutSec;
+ }
+
+ public Integer getRetryCount() {
+ return retryCount;
+ }
+
+ public Long getDefaultDelayMs() {
+ return defaultDelayMs;
+ }
+
+ public String getDefaultDestinations() {
+ return defaultDestinations;
+ }
+
+ public String getRegions() {
+ return regions;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = ( 31 * result ) + regions.hashCode();
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if( this == obj){
+ return true;
+ }
+
+ if( !(obj instanceof DatabaseQueue)){
+ return false;
+ }
+
+ DatabaseQueue that = (DatabaseQueue) obj;
+
+ if( !this.name.equalsIgnoreCase(that.name)){
+ return false;
+ }
+ if( !this.regions.equals(that.regions)){
+ return false;
+ }
+
+ return true;
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java
new file mode 100644
index 0000000..b00d269
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/QueueSerialization.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+import java.util.List;
+
+
+public interface QueueSerialization extends Migration {
+
+ void writeQueue(DatabaseQueue queue);
+
+ DatabaseQueue getQueue(String name);
+
+ void deleteQueue(String name);
+
+ List<String> getListOfQueues();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
new file mode 100644
index 0000000..932097a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queues.impl;
+
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.queues.DatabaseQueue;
+import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class QueueSerializationImpl implements QueueSerialization {
+
+ private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
+
+ private final CassandraClient cassandraClient;
+
+ public final static String COLUMN_QUEUE_NAME = "queue_name";
+ public final static String COLUMN_REGIONS = "regions";
+ public final static String COLUMN_DEFAULT_DESTINATIONS = "default_destinations";
+ public final static String COLUMN_DEFAULT_DELAY_MS = "default_delay_ms";
+ public final static String COLUMN_RETRY_COUNT = "retry_count";
+ public final static String COLUMN_HANDLING_TIMEOUT_SEC = "handling_timeout_sec";
+ public final static String COLUMN_DEAD_LETTER_QUEUE = "dead_letter_queue";
+
+
+ public final static String TABLE_QUEUES = "queues";
+
+ static final String CQL =
+ "CREATE TABLE IF NOT EXISTS queues ( " +
+ "queue_name text, " +
+ "regions text, " +
+ "default_destinations text, " +
+ "default_delay_ms bigint, " +
+ "retry_count int, " +
+ "handling_timeout_sec int, " +
+ "dead_letter_queue text, " +
+ "PRIMARY KEY ((queue_name)) " +
+ "); ";
+
+
+ @Inject
+ public QueueSerializationImpl( CassandraClient cassandraClient ) {
+ this.cassandraClient = cassandraClient;
+ }
+
+
+ @Override
+ public void writeQueue(DatabaseQueue queue) {
+
+ Statement insert = QueryBuilder.insertInto(TABLE_QUEUES)
+ .value(COLUMN_QUEUE_NAME, queue.getName())
+ .value(COLUMN_REGIONS, queue.getRegions())
+ .value(COLUMN_DEFAULT_DESTINATIONS, queue.getDefaultDestinations())
+ .value(COLUMN_DEFAULT_DELAY_MS, queue.getDefaultDelayMs())
+ .value(COLUMN_RETRY_COUNT, queue.getRetryCount())
+ .value(COLUMN_HANDLING_TIMEOUT_SEC, queue.getHandlingTimeoutSec())
+ .value(COLUMN_DEAD_LETTER_QUEUE, queue.getDeadLetterQueue());
+
+
+ cassandraClient.getSession().execute(insert);
+
+ }
+
+ @Override
+ public DatabaseQueue getQueue(String name) {
+
+ Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name);
+
+ Statement query = QueryBuilder.select().all().from(TABLE_QUEUES)
+ .where(queueNameClause);
+
+ Row row = cassandraClient.getSession().execute(query).one();
+
+ if(row == null){
+ return null;
+ }
+
+ final String queueName = row.getString(COLUMN_QUEUE_NAME);
+ final String regions = row.getString(COLUMN_REGIONS);
+ final String defaultDestinations = row.getString(COLUMN_DEFAULT_DESTINATIONS);
+ final long defaultDelayMs = row.getLong(COLUMN_DEFAULT_DELAY_MS);
+ final int retryCount = row.getInt(COLUMN_RETRY_COUNT);
+ final int handlingTimeoutSec = row.getInt(COLUMN_HANDLING_TIMEOUT_SEC);
+ final String deadLetterQueue = row.getString(COLUMN_DEAD_LETTER_QUEUE);
+
+ return new DatabaseQueue( queueName, regions, defaultDestinations, defaultDelayMs, retryCount,
+ handlingTimeoutSec, deadLetterQueue);
+
+ }
+
+ @Override
+ public void deleteQueue(String name) {
+
+ Clause queueNameClause = QueryBuilder.eq(COLUMN_QUEUE_NAME, name);
+
+ Statement delete = QueryBuilder.delete().from(TABLE_QUEUES)
+ .where(queueNameClause);
+
+ cassandraClient.getSession().execute(delete);
+ }
+
+ @Override
+ public List<String> getListOfQueues() {
+
+ Statement select = QueryBuilder.select().all().from( TABLE_QUEUES );
+ ResultSet rs = cassandraClient.getSession().execute( select );
+
+ return rs.all().stream()
+ .map( row -> row.getString( COLUMN_QUEUE_NAME ))
+ .collect( Collectors.toList() );
+ }
+
+
+ @Override
+ public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.singletonList( new TableDefinitionStringImpl( "queues", CQL ) );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java
new file mode 100644
index 0000000..20c802d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/Shard.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+
+import java.util.UUID;
+
+public class Shard {
+
+ public enum Type {
+
+ DEFAULT, INFLIGHT
+ }
+
+ private String queueName;
+ private String region;
+ private long shardId;
+ private Type type;
+ private UUID pointer;
+
+ public Shard(final String queueName, final String region, final Type type, final long shardId, UUID pointer){
+
+ this.queueName = queueName;
+ this.region = region;
+ this.type = type;
+ this.shardId = shardId;
+ this.pointer = pointer;
+
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public long getShardId() {
+ return shardId;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public UUID getPointer() {
+ return pointer;
+ }
+
+ public void setPointer(UUID pointer) {
+ this.pointer = pointer;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = queueName.hashCode();
+ result = ( 31 * result ) + region.hashCode();
+ result = ( 31 * result ) + (int)shardId;
+ result = ( 31 * result ) + type.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if( this == obj){
+ return true;
+ }
+
+ if( !(obj instanceof Shard)){
+ return false;
+ }
+
+ Shard that = (Shard) obj;
+
+ if( !this.queueName.equalsIgnoreCase(that.queueName)){
+ return false;
+ }
+ if( !this.region.equalsIgnoreCase(that.region)){
+ return false;
+ }
+ if( this.shardId != that.shardId){
+ return false;
+ }
+ if( !this.type.equals(that.type)){
+ return false;
+ }
+
+ return true;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
new file mode 100644
index 0000000..c29c548
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerialization.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+
+public interface ShardCounterSerialization extends Migration {
+
+ void incrementCounter(String queueName, Shard.Type type, long shardId, long increment);
+
+ long getCounterValue(String name, Shard.Type type, long shardId);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
new file mode 100644
index 0000000..31e31ce
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class ShardIterator implements Iterator<Shard> {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardIterator.class );
+
+ private final CassandraClient cassandraClient;
+
+ private final int PAGE_SIZE = 100;
+ private final String queueName;
+ private final String region;
+ private final Shard.Type shardType;
+ private final Optional<Long> shardId;
+
+ private Iterator<Shard> currentIterator;
+
+ private long nextStart = 0L;
+
+
+ public ShardIterator(
+ final CassandraClient cassandraClient,
+ final String queueName,
+ final String region,
+ final Shard.Type shardtype,
+ final Optional<Long> lastShardId){
+
+ this.queueName = queueName;
+ this.region = region;
+ this.shardType = shardtype;
+ this.shardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L);
+ this.cassandraClient = cassandraClient;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if(currentIterator == null || !currentIterator.hasNext()){
+ advance();
+ }
+
+ return currentIterator.hasNext();
+
+ }
+
+ @Override
+ public Shard next() {
+
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No next shard exists" );
+ }
+
+ return currentIterator.next();
+
+ }
+
+ private void advance(){
+
+
+ Clause queueNameClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_QUEUE_NAME, queueName);
+ Clause regionClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_REGION, region);
+ Clause activeClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_ACTIVE, 1);
+ Clause shardIdClause;
+ if(nextStart == 0L && shardId.isPresent()){
+ shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, shardId.get());
+ }else if( nextStart == 0L && !shardId.isPresent()){
+ shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L);
+
+ }else{
+ shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart);
+ }
+
+
+
+ Statement query = QueryBuilder.select().all().from(ShardSerializationImpl.getTableName(shardType))
+ .where(queueNameClause)
+ .and(regionClause)
+ .and(activeClause)
+ .and(shardIdClause)
+ .limit(PAGE_SIZE);
+
+ List<Row> rows = cassandraClient.getSession().execute(query).all();
+
+
+ currentIterator = getIteratorFromRows(rows);
+
+
+ }
+
+
+ private Iterator<Shard> getIteratorFromRows( List<Row> rows){
+
+ List<Shard> shards = new ArrayList<>(rows.size());
+
+ rows.forEach(row -> {
+
+ final String queueName = row.getString( ShardSerializationImpl.COLUMN_QUEUE_NAME);
+ final String region = row.getString( ShardSerializationImpl.COLUMN_REGION);
+ final long shardId = row.getLong( ShardSerializationImpl.COLUMN_SHARD_ID);
+ final UUID pointer = row.getUUID( ShardSerializationImpl.COLUMN_POINTER);
+
+ shards.add(new Shard(queueName, region, shardType, shardId, pointer));
+
+ nextStart = shardId;
+
+ });
+
+ return shards.iterator();
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
new file mode 100644
index 0000000..c0173ab
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerialization.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+
+public interface ShardSerialization extends Migration {
+
+ void createShard(final Shard shard);
+
+ Shard loadShard(final Shard shard);
+
+ void deleteShard(final Shard shard);
+
+ void updateShardPointer(final Shard shard);
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java
new file mode 100644
index 0000000..013f0b6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding;
+
+import java.util.UUID;
+
+
+public interface ShardStrategy {
+
+ /**
+ * Select shard that should be used for the specified Queue Message.
+ * @param queueName Name of queue
+ * @param region Region
+ * @param type Indicates whether message is inflight or available
+ * @param pointer Queue Message ID (must be Time-based)
+ */
+ Shard selectShard(String queueName, String region, Shard.Type type, UUID pointer);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3075dce1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java
new file mode 100644
index 0000000..3ca79e6
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/PlaceholderShardStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.sharding.impl;
+
+
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardStrategy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+// TODO: delete me!
+
+public class PlaceholderShardStrategy implements ShardStrategy {
+
+ private Map<String, Shard> shardMap = new HashMap<>();
+
+
+ @Override
+ public Shard selectShard(String queueName, String region, Shard.Type type, UUID pointer) {
+ String key = queueName + region + type;
+ shardMap.putIfAbsent( key, new Shard( queueName, region, type, 0L, pointer ) );
+ return shardMap.get( key );
+ }
+}