You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:48 UTC
[38/96] [abbrv] [partial] Change package namespace to
org.apache.usergrid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java
new file mode 100644
index 0000000..ed257e6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java
@@ -0,0 +1,627 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+
+import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
+import static org.apache.usergrid.utils.ConversionUtils.ascii;
+import static org.apache.usergrid.utils.ConversionUtils.uuidToBytesNullOk;
+
+
+/** @author edanuff */
+public class ConnectionRefImpl implements ConnectionRef {
+
+ public static final int MAX_LINKS = 1;
+
+ /**
+ *
+ */
+ public static final int ALL = 0;
+ /**
+ *
+ */
+ public static final int BY_CONNECTION_TYPE = 1;
+ /**
+ *
+ */
+ public static final int BY_ENTITY_TYPE = 2;
+ /**
+ *
+ */
+ public static final int BY_CONNECTION_AND_ENTITY_TYPE = 3;
+
+ /**
+ *
+ */
+ public static final String NULL_ENTITY_TYPE = "Null";
+ /**
+ *
+ */
+ public static final UUID NULL_ID = new UUID( 0, 0 );
+
+ private static final Logger logger = LoggerFactory.getLogger( ConnectionRefImpl.class );
+
+
+ public static final String CONNECTION_ENTITY_TYPE = "Connection";
+ public static final String CONNECTION_ENTITY_CONNECTION_TYPE = "connection";
+
+
+ private final EntityRef connectingEntity;
+
+ private final List<ConnectedEntityRef> pairedConnections;
+
+ private final ConnectedEntityRef connectedEntity;
+
+
+ /**
+ *
+ */
+ public ConnectionRefImpl() {
+ connectingEntity = SimpleEntityRef.ref();
+ pairedConnections = Collections.emptyList();
+ connectedEntity = new ConnectedEntityRefImpl();
+ }
+
+
+ /**
+ * @param connectingEntityType
+ * @param connectingEntityId
+ * @param connectionType
+ * @param connectedEntityType
+ * @param connectedEntityId
+ */
+ public ConnectionRefImpl( String connectingEntityType, UUID connectingEntityId, String connectionType,
+ String connectedEntityType, UUID connectedEntityId ) {
+
+ connectingEntity = ref( connectingEntityType, connectingEntityId );
+
+ pairedConnections = Collections.emptyList();
+
+ connectedEntity = new ConnectedEntityRefImpl( connectionType, connectedEntityType, connectedEntityId );
+ }
+
+
+ /** Create a connection from the source to the target entity */
+ public ConnectionRefImpl( EntityRef source, String connectionType, EntityRef target ) {
+
+ this.connectingEntity = ref( source );
+
+ pairedConnections = Collections.emptyList();
+
+ this.connectedEntity = new ConnectedEntityRefImpl( connectionType, target );
+ }
+
+
+ public ConnectionRefImpl( ConnectionRef connection ) {
+
+ connectingEntity = connection.getConnectingEntity();
+
+ List<ConnectedEntityRef> pc = connection.getPairedConnections();
+ if ( pc == null ) {
+ pc = Collections.emptyList();
+ }
+ pairedConnections = pc;
+
+ connectedEntity = connection.getConnectedEntity();
+ }
+
+
+ public ConnectionRefImpl( EntityRef connectingEntity, ConnectedEntityRef... connections ) {
+
+ this.connectingEntity = ref( connectingEntity );
+
+ ConnectedEntityRef ce = new ConnectedEntityRefImpl();
+ List<ConnectedEntityRef> pc = Collections.emptyList();
+ if ( connections.length > 0 ) {
+
+ ce = connections[connections.length - 1];
+
+ if ( connections.length > 1 ) {
+ pc = Arrays.asList( Arrays.copyOfRange( connections, 0, connections.length - 2 ) );
+ }
+ }
+ pairedConnections = pc;
+ connectedEntity = ce;
+ }
+
+
+ public ConnectionRefImpl( ConnectionRef connection, ConnectedEntityRef... connections ) {
+
+ if ( connection == null ) {
+ throw new NullPointerException( "ConnectionImpl constructor \'connection\' cannot be null" );
+ }
+
+ connectingEntity = connection.getConnectingEntity();
+
+ if ( connections.length > 0 ) {
+
+ pairedConnections = new ArrayList<ConnectedEntityRef>();
+ pairedConnections.addAll( connection.getPairedConnections() );
+ pairedConnections.add( connection.getConnectedEntity() );
+
+ connectedEntity = connections[connections.length - 1];
+
+ if ( connections.length > 1 ) {
+ pairedConnections
+ .addAll( Arrays.asList( Arrays.copyOfRange( connections, 0, connections.length - 2 ) ) );
+ }
+ }
+ else {
+ pairedConnections = new ArrayList<ConnectedEntityRef>();
+ connectedEntity = new ConnectedEntityRefImpl();
+ }
+ }
+
+
+ public ConnectionRefImpl( EntityRef connectingEntity, List<ConnectedEntityRef> pairedConnections,
+ ConnectedEntityRef connectedEntity ) {
+ this.connectingEntity = connectingEntity;
+ this.pairedConnections = pairedConnections;
+ this.connectedEntity = connectedEntity;
+ }
+
+
+ public UUID getSearchIndexId() {
+ return null;
+ }
+
+
+ public String getSearchConnectionType() {
+ return null;
+ }
+
+
+ public String getSearchResultType() {
+ return null;
+ }
+
+
+ public String getSearchIndexName() {
+ return null;
+ }
+
+
+ @Override
+ public EntityRef getConnectingEntity() {
+ return connectingEntity;
+ }
+
+
+ /**
+ * @return
+ */
+ public String getConnectingEntityType() {
+ if ( connectingEntity == null ) {
+ return null;
+ }
+ return connectingEntity.getType();
+ }
+
+
+ /**
+ * @return
+ */
+ public UUID getConnectingEntityId() {
+ if ( connectingEntity == null ) {
+ return null;
+ }
+ return connectingEntity.getUuid();
+ }
+
+
+ @Override
+ public List<ConnectedEntityRef> getPairedConnections() {
+ return pairedConnections;
+ }
+
+
+ public ConnectedEntityRef getFirstPairedConnection() {
+ ConnectedEntityRef pairedConnection = null;
+
+ if ( ( pairedConnections != null ) && ( pairedConnections.size() > 0 ) ) {
+ pairedConnection = pairedConnections.get( 0 );
+ }
+
+ return pairedConnection;
+ }
+
+
+ public UUID getFirstPairedConnectedEntityId() {
+ ConnectedEntityRef pairedConnection = getFirstPairedConnection();
+ if ( pairedConnection != null ) {
+ return pairedConnection.getUuid();
+ }
+ return null;
+ }
+
+
+ public String getFirstPairedConnectedEntityType() {
+ ConnectedEntityRef pairedConnection = getFirstPairedConnection();
+ if ( pairedConnection != null ) {
+ return pairedConnection.getType();
+ }
+ return null;
+ }
+
+
+ public String getFirstPairedConnectionType() {
+ ConnectedEntityRef pairedConnection = getFirstPairedConnection();
+ if ( pairedConnection != null ) {
+ return pairedConnection.getConnectionType();
+ }
+ return null;
+ }
+
+
+ @Override
+ public ConnectedEntityRef getConnectedEntity() {
+ return connectedEntity;
+ }
+
+
+ /**
+ * @return
+ */
+ @Override
+ public String getConnectionType() {
+ if ( connectedEntity == null ) {
+ return null;
+ }
+ return connectedEntity.getConnectionType();
+ }
+
+
+ /**
+ * @return
+ */
+ public String getConnectedEntityType() {
+ if ( connectedEntity == null ) {
+ return null;
+ }
+ return connectedEntity.getType();
+ }
+
+
+ /**
+ * @return
+ */
+ public UUID getConnectedEntityId() {
+ return connectedEntity.getUuid();
+ }
+
+
+ private UUID id;
+
+
+ /** @return connection id */
+ @Override
+ public UUID getUuid() {
+ if ( id == null ) {
+ id = getId( getConnectingEntity(), getConnectedEntity(),
+ getPairedConnections().toArray( new ConnectedEntityRef[0] ) );
+ }
+ return id;
+ }
+
+
+ @Override
+ public String getType() {
+ return CONNECTION_ENTITY_TYPE;
+ }
+
+
+ public UUID getIndexId() {
+ return getIndexId( getConnectingEntity(), getConnectionType(), getConnectedEntityType(),
+ pairedConnections.toArray( new ConnectedEntityRef[0] ) );
+ }
+
+
+ public UUID getConnectingIndexId() {
+ return getIndexId( getConnectingEntity(), getConnectionType(), null,
+ pairedConnections.toArray( new ConnectedEntityRef[0] ) );
+ }
+
+
+ public ConnectionRefImpl getConnectionToConnectionEntity() {
+ return new ConnectionRefImpl( getConnectingEntity(),
+ new ConnectedEntityRefImpl( CONNECTION_ENTITY_CONNECTION_TYPE, CONNECTION_ENTITY_TYPE, getUuid() ) );
+ }
+
+
+ /** @return index ids */
+ public UUID[] getIndexIds() {
+
+ return getIndexIds( getConnectingEntity(), getConnectedEntity().getConnectionType(),
+ getConnectedEntity().getType(), getPairedConnections().toArray( new ConnectedEntityRef[0] ) );
+ }
+
+
+ static String typeOrDefault( String type ) {
+ if ( ( type == null ) || ( type.length() == 0 ) ) {
+ return NULL_ENTITY_TYPE;
+ }
+ return type;
+ }
+
+
+ static UUID idOrDefault( UUID uuid ) {
+ if ( uuid == null ) {
+ return NULL_ID;
+ }
+ return uuid;
+ }
+
+
+ public static boolean connectionsNull( ConnectedEntityRef... pairedConnections ) {
+ if ( ( pairedConnections == null ) || ( pairedConnections.length == 0 ) ) {
+ return true;
+ }
+
+ for ( ConnectedEntityRef pairedConnection : pairedConnections ) {
+ if ( pairedConnection == null || pairedConnection.getUuid() == null || pairedConnection.getUuid().equals(
+ NULL_ID ) ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
+ public static ConnectedEntityRef[] getConnections( ConnectedEntityRef... connections ) {
+ return connections;
+ }
+
+
+ public static List<ConnectedEntityRef> getConnectionsList( ConnectedEntityRef... connections ) {
+ return Arrays.asList( connections );
+ }
+
+
+ /** @return connection id */
+ public static UUID getId( UUID connectingEntityId, String connectionType, UUID connectedEntityId ) {
+ return getId( connectingEntityId, null, null, connectionType, connectedEntityId );
+ }
+
+
+ /**
+ * Connection id is constructed from packed structure of properties strings are truncated to 16 ascii bytes.
+ * Connection id is now MD5'd into a UUID via UUID.nameUUIDFromBytes() so, technically string concatenation could be
+ * used prior to MD5
+ *
+ * @return connection id
+ */
+ public static UUID getId( UUID connectingEntityId, String pairedConnectionType, UUID pairedConnectingEntityId,
+ String connectionType, UUID connectedEntityId ) {
+
+ EntityRef connectingEntity = ref( connectingEntityId );
+
+ ConnectedEntityRef[] pairedConnections =
+ getConnections( new ConnectedEntityRefImpl( pairedConnectionType, null, pairedConnectingEntityId ) );
+
+ ConnectedEntityRef connectedEntity = new ConnectedEntityRefImpl( connectionType, null, connectedEntityId );
+
+ return getId( connectingEntity, connectedEntity, pairedConnections );
+ }
+
+
+ public static UUID getId( EntityRef connectingEntity, ConnectedEntityRef connectedEntity,
+ ConnectedEntityRef... pairedConnections ) {
+ UUID uuid = null;
+ try {
+
+ if ( connectionsNull( pairedConnections ) && connectionsNull( connectedEntity ) ) {
+ return connectingEntity.getUuid();
+ }
+
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream( 16 + ( 32 * pairedConnections.length ) );
+
+ byteStream.write( uuidToBytesNullOk( connectingEntity.getUuid() ) );
+
+ for ( ConnectedEntityRef connection : pairedConnections ) {
+ String connectionType = connection.getConnectionType();
+ UUID connectedEntityID = connection.getUuid();
+
+ byteStream.write( ascii( StringUtils.lowerCase( connectionType ) ) );
+ byteStream.write( uuidToBytesNullOk( connectedEntityID ) );
+ }
+
+ String connectionType = connectedEntity.getConnectionType();
+ if ( connectionType == null ) {
+ connectionType = NULL_ENTITY_TYPE;
+ }
+
+ UUID connectedEntityID = connectedEntity.getUuid();
+
+ byteStream.write( ascii( StringUtils.lowerCase( connectionType ) ) );
+ byteStream.write( uuidToBytesNullOk( connectedEntityID ) );
+
+ byte[] raw_id = byteStream.toByteArray();
+
+ // logger.info("raw connection index id: " +
+ // Hex.encodeHexString(raw_id));
+
+ uuid = UUID.nameUUIDFromBytes( raw_id );
+
+ // logger.info("connection index uuid: " + uuid);
+
+ }
+ catch ( IOException e ) {
+ logger.error( "Unable to create connection UUID", e );
+ }
+ return uuid;
+ }
+
+
+ /** @return connection index id */
+ public static UUID getIndexId( UUID connectingEntityId, String connectionType, String connectedEntityType ) {
+ return getIndexId( connectingEntityId, null, null, connectionType, connectedEntityType );
+ }
+
+
+ /** @return connection index id */
+ public static UUID getIndexId( UUID connectingEntityId, String pairedConnectionType, UUID pairedConnectingEntityId,
+ String connectionType, String connectedEntityType ) {
+
+ EntityRef connectingEntity = ref( connectingEntityId );
+
+ ConnectedEntityRef[] pairedConnections =
+ getConnections( new ConnectedEntityRefImpl( pairedConnectionType, null, pairedConnectingEntityId ) );
+
+ return getIndexId( connectingEntity, connectionType, connectedEntityType, pairedConnections );
+ }
+
+
+ public static UUID getIndexId( EntityRef connectingEntity, String connectionType, String connectedEntityType,
+ ConnectedEntityRef... pairedConnections ) {
+
+ UUID uuid = null;
+ try {
+
+ if ( connectionsNull( pairedConnections ) && ( ( connectionType == null ) && ( connectedEntityType
+ == null ) ) ) {
+ return connectingEntity.getUuid();
+ }
+
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream( 16 + ( 32 * pairedConnections.length ) );
+
+ byteStream.write( uuidToBytesNullOk( connectingEntity.getUuid() ) );
+
+ for ( ConnectedEntityRef connection : pairedConnections ) {
+ String type = connection.getConnectionType();
+ UUID id = connection.getUuid();
+
+ byteStream.write( ascii( StringUtils.lowerCase( type ) ) );
+ byteStream.write( uuidToBytesNullOk( id ) );
+ }
+
+ if ( connectionType == null ) {
+ connectionType = NULL_ENTITY_TYPE;
+ }
+ if ( connectedEntityType == null ) {
+ connectedEntityType = NULL_ENTITY_TYPE;
+ }
+
+ byteStream.write( ascii( StringUtils.lowerCase( connectionType ) ) );
+ byteStream.write( ascii( StringUtils.lowerCase( connectedEntityType ) ) );
+
+ byte[] raw_id = byteStream.toByteArray();
+
+ logger.info( "raw connection index id: " + Hex.encodeHexString( raw_id ) );
+
+ uuid = UUID.nameUUIDFromBytes( raw_id );
+
+ logger.info( "connection index uuid: " + uuid );
+ }
+ catch ( IOException e ) {
+ logger.error( "Unable to create connection index UUID", e );
+ }
+ return uuid;
+ }
+
+
+ /** @return connection index id */
+ public static UUID getIndexId( int variant, UUID connectingEntityId, String pairedConnectionType,
+ UUID pairedConnectingEntityId, String connectionType, String connectedEntityType ) {
+
+ EntityRef connectingEntity = ref( connectingEntityId );
+
+ ConnectedEntityRef[] pairedConnections =
+ getConnections( new ConnectedEntityRefImpl( pairedConnectionType, null, pairedConnectingEntityId ) );
+
+ return getIndexId( variant, connectingEntity, connectionType, connectedEntityType, pairedConnections );
+ }
+
+
+ public static UUID getIndexId( int variant, EntityRef connectingEntity, String connectionType,
+ String connectedEntityType, ConnectedEntityRef... pairedConnections ) {
+
+ switch ( variant ) {
+
+ case ALL:
+ if ( connectionsNull( pairedConnections ) ) {
+ return connectingEntity.getUuid();
+ }
+ else {
+ return getIndexId( connectingEntity, null, null, pairedConnections );
+ }
+
+ case BY_ENTITY_TYPE:
+ return getIndexId( connectingEntity, null, connectedEntityType, pairedConnections );
+
+ case BY_CONNECTION_TYPE:
+ return getIndexId( connectingEntity, connectionType, null, pairedConnections );
+
+ case BY_CONNECTION_AND_ENTITY_TYPE:
+ return getIndexId( connectingEntity, connectionType, connectedEntityType, pairedConnections );
+ }
+
+ return connectingEntity.getUuid();
+ }
+
+
+ /** @return index ids */
+ public static UUID[] getIndexIds( UUID connectingEntityId, String connectionType, String connectedEntityType ) {
+ return getIndexIds( connectingEntityId, null, null, connectionType, connectedEntityType );
+ }
+
+
+ /** @return index ids */
+ public static UUID[] getIndexIds( UUID connectingEntityId, String pairedConnectionType,
+ UUID pairedConnectingEntityId, String connectionType,
+ String connectedEntityType ) {
+
+ UUID[] variants = new UUID[4];
+
+ for ( int i = 0; i < 4; i++ ) {
+ variants[i] =
+ getIndexId( i, connectingEntityId, pairedConnectionType, pairedConnectingEntityId, connectionType,
+ connectedEntityType );
+ }
+
+ return variants;
+ }
+
+
+ public static UUID[] getIndexIds( EntityRef connectingEntity, String connectionType, String connectedEntityType,
+ ConnectedEntityRef... pairedConnections ) {
+
+ UUID[] variants = new UUID[4];
+
+ for ( int i = 0; i < 4; i++ ) {
+ variants[i] = getIndexId( i, connectingEntity, connectionType, connectedEntityType, pairedConnections );
+ }
+
+ return variants;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java
new file mode 100644
index 0000000..2de4f0d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java
@@ -0,0 +1,397 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.count.Batcher;
+import org.apache.usergrid.count.common.Count;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.cassandra.QueuesCF;
+import org.apache.usergrid.persistence.CounterResolution;
+import org.apache.usergrid.persistence.entities.Event;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.PrefixedSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createCounterColumn;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+
+
+public class CounterUtils {
+
+ public static final Logger logger = LoggerFactory.getLogger( CounterUtils.class );
+
+ public static final LongSerializer le = new LongSerializer();
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+
+ private String counterType = "o";
+
+ private Batcher batcher;
+
+
+ public void setBatcher( Batcher batcher ) {
+ this.batcher = batcher;
+ }
+
+
+ /** Set the type to 'new' ("n"), 'parallel' ("p"), 'old' ("o" - the default) If not one of the above, do nothing */
+ public void setCounterType( String counterType ) {
+ if ( counterType == null ) {
+ return;
+ }
+ if ( "n".equals( counterType ) || "p".equals( counterType ) || "o".equals( counterType ) ) {
+ this.counterType = counterType;
+ }
+ }
+
+
+ public boolean getIsCounterBatched() {
+ return "n".equals( counterType );
+ }
+
+
+ public static class AggregateCounterSelection {
+ public static final String COLON = ":";
+ public static final String STAR = "*";
+ String name;
+ UUID userId;
+ UUID groupId;
+ UUID queueId;
+ String category;
+
+
+ public AggregateCounterSelection( String name, UUID userId, UUID groupId, UUID queueId, String category ) {
+ this.name = name.toLowerCase();
+ this.userId = userId;
+ this.groupId = groupId;
+ this.queueId = queueId;
+ this.category = category;
+ }
+
+
+ public void apply( String name, UUID userId, UUID groupId, UUID queueId, String category ) {
+ this.name = name.toLowerCase();
+ this.userId = userId;
+ this.groupId = groupId;
+ this.queueId = queueId;
+ this.category = category;
+ }
+
+
+ public String getName() {
+ return name;
+ }
+
+
+ public void setName( String name ) {
+ this.name = name;
+ }
+
+
+ public UUID getUserId() {
+ return userId;
+ }
+
+
+ public void setUserId( UUID userId ) {
+ this.userId = userId;
+ }
+
+
+ public UUID getGroupId() {
+ return groupId;
+ }
+
+
+ public void setGroupId( UUID groupId ) {
+ this.groupId = groupId;
+ }
+
+
+ public UUID getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId( UUID queueId ) {
+ this.queueId = queueId;
+ }
+
+
+ public String getCategory() {
+ return category;
+ }
+
+
+ public void setCategory( String category ) {
+ this.category = category;
+ }
+
+
+ public String getRow( CounterResolution resolution ) {
+ return rowBuilder( name, userId, groupId, queueId, category, resolution );
+ }
+
+
+ public static String rowBuilder( String name, UUID userId, UUID groupId, UUID queueId, String category,
+ CounterResolution resolution ) {
+ StringBuilder builder = new StringBuilder( name );
+ builder.append( COLON ).append( ( userId != null ? userId.toString() : STAR ) ).append( COLON )
+ .append( groupId != null ? groupId.toString() : STAR ).append( COLON )
+ .append( ( queueId != null ? queueId.toString() : STAR ) ).append( COLON )
+ .append( ( category != null ? category : STAR ) ).append( COLON ).append( resolution.name() );
+ return builder.toString();
+ }
+ }
+
+
+ public void addEventCounterMutations( Mutator<ByteBuffer> m, UUID applicationId, Event event, long timestamp ) {
+ if ( event.getCounters() != null ) {
+ for ( Entry<String, Integer> value : event.getCounters().entrySet() ) {
+ batchIncrementAggregateCounters( m, applicationId, event.getUser(), event.getGroup(), null,
+ event.getCategory(), value.getKey().toLowerCase(), value.getValue(), event.getTimestamp(),
+ timestamp );
+ }
+ }
+ }
+
+
+ public void addMessageCounterMutations( Mutator<ByteBuffer> m, UUID applicationId, UUID queueId, Message msg,
+ long timestamp ) {
+ if ( msg.getCounters() != null ) {
+ for ( Entry<String, Integer> value : msg.getCounters().entrySet() ) {
+ batchIncrementAggregateCounters( m, applicationId, null, null, queueId, msg.getCategory(),
+ value.getKey().toLowerCase(), value.getValue(), msg.getTimestamp(), timestamp );
+ }
+ }
+ }
+
+
+ public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
+ UUID queueId, String category, Map<String, Long> counters,
+ long timestamp ) {
+ if ( counters != null ) {
+ for ( Entry<String, Long> value : counters.entrySet() ) {
+ batchIncrementAggregateCounters( m, applicationId, userId, groupId, queueId, category,
+ value.getKey().toLowerCase(), value.getValue(), timestamp );
+ }
+ }
+ }
+
+
+ public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
+ UUID queueId, String category, String name, long value,
+ long cassandraTimestamp ) {
+ batchIncrementAggregateCounters( m, applicationId, userId, groupId, queueId, category, name, value,
+ cassandraTimestamp / 1000, cassandraTimestamp );
+ }
+
+
+ public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
+ UUID queueId, String category, String name, long value,
+ long counterTimestamp, long cassandraTimestamp ) {
+ for ( CounterResolution resolution : CounterResolution.values() ) {
+ logger.debug( "BIAC for resolution {}", resolution );
+ batchIncrementAggregateCounters( m, userId, groupId, queueId, category, resolution, name, value,
+ counterTimestamp, applicationId );
+ logger.debug( "DONE BIAC for resolution {}", resolution );
+ }
+ batchIncrementEntityCounter( m, applicationId, name, value, cassandraTimestamp, applicationId );
+ if ( userId != null ) {
+ batchIncrementEntityCounter( m, userId, name, value, cassandraTimestamp, applicationId );
+ }
+ if ( groupId != null ) {
+ batchIncrementEntityCounter( m, groupId, name, value, cassandraTimestamp, applicationId );
+ }
+ }
+
+
+ private void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID userId, UUID groupId, UUID queueId,
+ String category, CounterResolution resolution, String name,
+ long value, long counterTimestamp, UUID applicationId ) {
+
+ String[] segments = StringUtils.split( name, '.' );
+ for ( int j = 0; j < segments.length; j++ ) {
+ name = StringUtils.join( segments, '.', 0, j + 1 );
+ // skip system counter
+ if ( "system".equals( name ) ) {
+ continue;
+ }
+
+ // *:*:*:*
+ handleAggregateCounterRow( m,
+ AggregateCounterSelection.rowBuilder( name, null, null, null, null, resolution ),
+ resolution.round( counterTimestamp ), value, applicationId );
+ String currentRow = null;
+ HashSet<String> rowSet = new HashSet<String>( 16 );
+ for ( int i = 0; i < 16; i++ ) {
+
+ boolean include_user = ( i & 0x01 ) != 0;
+ boolean include_group = ( i & 0x02 ) != 0;
+ boolean include_queue = ( i & 0x04 ) != 0;
+ boolean include_category = ( i & 0x08 ) != 0;
+
+ Object[] parameters = {
+ include_user ? userId : null, include_group ? groupId : null, include_queue ? queueId : null,
+ include_category ? category : null
+ };
+ int non_null = 0;
+ for ( Object p : parameters ) {
+ if ( p != null ) {
+ non_null++;
+ }
+ }
+ currentRow = AggregateCounterSelection
+ .rowBuilder( name, ( UUID ) parameters[0], ( UUID ) parameters[1], ( UUID ) parameters[2],
+ ( String ) parameters[3], resolution );
+
+ if ( non_null > 0 && !rowSet.contains( currentRow ) ) {
+ rowSet.add( currentRow );
+ handleAggregateCounterRow( m, currentRow, resolution.round( counterTimestamp ), value,
+ applicationId );
+ }
+ }
+ }
+ }
+
+
+ private void handleAggregateCounterRow( Mutator<ByteBuffer> m, String key, long column, long value,
+ UUID applicationId ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.info( "HACR: aggregateRow for app {} with key {} column {} and value {}",
+ new Object[] { applicationId, key, column, value } );
+ }
+ if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
+ if ( m != null ) {
+ HCounterColumn<Long> c = createCounterColumn( column, value, le );
+ m.addCounter( bytebuffer( key ), APPLICATION_AGGREGATE_COUNTERS.toString(), c );
+ }
+ }
+ if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
+ // create and add Count
+ PrefixedSerializer ps =
+ new PrefixedSerializer( applicationId, UUIDSerializer.get(), StringSerializer.get() );
+ batcher.add(
+ new Count( APPLICATION_AGGREGATE_COUNTERS.toString(), ps.toByteBuffer( key ), column, value ) );
+ }
+ }
+
+
+ public AggregateCounterSelection getAggregateCounterSelection( String name, UUID userId, UUID groupId, UUID queueId,
+ String category ) {
+ return new AggregateCounterSelection( name, userId, groupId, queueId, category );
+ }
+
+
+ public String getAggregateCounterRow( String name, UUID userId, UUID groupId, UUID queueId, String category,
+ CounterResolution resolution ) {
+ return AggregateCounterSelection.rowBuilder( name, userId, groupId, queueId, category, resolution );
+ }
+
+
+ public List<String> getAggregateCounterRows( List<AggregateCounterSelection> selections,
+ CounterResolution resolution ) {
+ List<String> keys = new ArrayList<String>();
+ for ( AggregateCounterSelection selection : selections ) {
+ keys.add( selection.getRow( resolution ) );
+ }
+ return keys;
+ }
+
+
+ private Mutator<ByteBuffer> batchIncrementEntityCounter( Mutator<ByteBuffer> m, UUID entityId, String name,
+ Long value, long timestamp, UUID applicationId ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "BIEC: Incrementing property {} of entity {} by value {}",
+ new Object[] { name, entityId, value } );
+ }
+ addInsertToMutator( m, ENTITY_DICTIONARIES, key( entityId, DICTIONARY_COUNTERS ), name, null, timestamp );
+ if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
+ HCounterColumn<String> c = createCounterColumn( name, value );
+ m.addCounter( bytebuffer( entityId ), ENTITY_COUNTERS.toString(), c );
+ }
+ if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
+ PrefixedSerializer ps = new PrefixedSerializer( applicationId, UUIDSerializer.get(), UUIDSerializer.get() );
+ batcher.add( new Count( ENTITY_COUNTERS.toString(), ps.toByteBuffer( entityId ), name, value ) );
+ }
+ return m;
+ }
+
+
+ public Mutator<ByteBuffer> batchIncrementQueueCounter( Mutator<ByteBuffer> m, UUID queueId, String name, long value,
+ long timestamp, UUID applicationId ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "BIQC: Incrementing property {} of queue {} by value {}",
+ new Object[] { name, queueId, value } );
+ }
+ m.addInsertion( bytebuffer( key( queueId, DICTIONARY_COUNTERS ).toString() ),
+ QueuesCF.QUEUE_DICTIONARIES.toString(),
+ createColumn( name, ByteBuffer.allocate( 0 ), timestamp, se, be ) );
+ if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
+ HCounterColumn<String> c = createCounterColumn( name, value );
+ ByteBuffer keybytes = bytebuffer( queueId );
+ m.addCounter( keybytes, QueuesCF.COUNTERS.toString(), c );
+ }
+ if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
+ PrefixedSerializer ps = new PrefixedSerializer( applicationId, UUIDSerializer.get(), UUIDSerializer.get() );
+ batcher.add( new Count( QueuesCF.COUNTERS.toString(), ps.toByteBuffer( queueId ), name, value ) );
+ }
+ return m;
+ }
+
+
+ public Mutator<ByteBuffer> batchIncrementQueueCounters( Mutator<ByteBuffer> m, UUID queueId,
+ Map<String, Long> values, long timestamp,
+ UUID applicationId ) {
+ for ( Entry<String, Long> entry : values.entrySet() ) {
+ batchIncrementQueueCounter( m, queueId, entry.getKey(), entry.getValue(), timestamp, applicationId );
+ }
+ return m;
+ }
+
+
+ public Mutator<ByteBuffer> batchIncrementQueueCounters( Mutator<ByteBuffer> m, Map<UUID, Map<String, Long>> values,
+ long timestamp, UUID applicationId ) {
+ for ( Entry<UUID, Map<String, Long>> entry : values.entrySet() ) {
+ batchIncrementQueueCounters( m, entry.getKey(), entry.getValue(), timestamp, applicationId );
+ }
+ return m;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java
new file mode 100644
index 0000000..b49f0fe
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java
@@ -0,0 +1,120 @@
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static java.lang.Integer.parseInt;
+import static org.apache.commons.codec.binary.Base64.decodeBase64;
+import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
+import static org.apache.commons.lang.StringUtils.split;
+import static org.apache.usergrid.utils.ConversionUtils.bytes;
+
+
+/**
+ * Internal cursor parsing
+ *
+ * @author tnine
+ */
+public class CursorCache {
+
+ private Map<Integer, ByteBuffer> cursors = new HashMap<Integer, ByteBuffer>();
+
+
+ public CursorCache() {
+
+ }
+
+
+ /** Create a new cursor cache from the string if passed */
+ public CursorCache( String cursorString ) {
+
+ if ( cursorString == null ) {
+ return;
+ }
+
+ String decoded = new String( decodeBase64( cursorString ) );
+
+ // nothing to do
+ if ( decoded.indexOf( ':' ) < 0 ) {
+ return;
+ }
+
+ String[] cursorTokens = split( decoded, '|' );
+
+ for ( String c : cursorTokens ) {
+
+ String[] parts = split( c, ':' );
+
+ if ( parts.length >= 1 ) {
+
+ int hashCode = parseInt( parts[0] );
+
+ ByteBuffer cursorBytes = null;
+
+ if ( parts.length == 2 ) {
+ cursorBytes = ByteBuffer.wrap( decodeBase64( parts[1] ) );
+ }
+ else {
+ cursorBytes = ByteBuffer.allocate( 0 );
+ }
+
+ cursors.put( hashCode, cursorBytes );
+ }
+ }
+ }
+
+
+ /** Set the cursor with the given hash and the new byte buffer */
+ public void setNextCursor( int sliceHash, ByteBuffer newCursor ) {
+ cursors.put( sliceHash, newCursor );
+ }
+
+
+ /** Get the cursor by the hashcode of the slice */
+ public ByteBuffer getCursorBytes( int sliceHash ) {
+ return cursors.get( sliceHash );
+ }
+
+
+ /** Turn the cursor cache into a string */
+ public String asString() {
+ /**
+ * No cursors to return
+ */
+ if ( cursors.size() == 0 ) {
+ return null;
+ }
+
+ StringBuffer buff = new StringBuffer();
+
+ int nullCount = 0;
+ ByteBuffer value = null;
+
+ for ( Entry<Integer, ByteBuffer> entry : cursors.entrySet() ) {
+ value = entry.getValue();
+
+ buff.append( entry.getKey() );
+ buff.append( ":" );
+ buff.append( encodeBase64URLSafeString( bytes( value ) ) );
+ buff.append( "|" );
+
+ // this range was empty, mark it as a null
+ if ( value == null || value.remaining() == 0 ) {
+ nullCount++;
+ }
+ }
+
+ // all cursors are complete, return null
+ if ( nullCount == cursors.size() ) {
+ return null;
+ }
+
+ // trim off the last pipe
+ buff.setLength( buff.length() - 1 );
+
+ return encodeBase64URLSafeString( buff.toString().getBytes() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
new file mode 100644
index 0000000..51dbc9e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -0,0 +1,408 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.DynamicEntity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.yammer.metrics.annotation.Metered;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.asMap;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.APPLICATIONS_CF;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.PROPERTIES_CF;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT;
+import static org.apache.usergrid.utils.ConversionUtils.uuid;
+
+
+/**
+ * Cassandra-specific implementation of Datastore
+ *
+ * @author edanuff
+ */
+public class EntityManagerFactoryImpl implements EntityManagerFactory, ApplicationContextAware {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityManagerFactoryImpl.class );
+
+ public static String IMPLEMENTATION_DESCRIPTION = "Cassandra Entity Manager Factory 1.0";
+
+ public static final Class<DynamicEntity> APPLICATION_ENTITY_CLASS = DynamicEntity.class;
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final BytesArraySerializer bae = new BytesArraySerializer();
+ public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+ public static final LongSerializer le = new LongSerializer();
+
+ ApplicationContext applicationContext;
+
+ CassandraService cass;
+ CounterUtils counterUtils;
+
+ private boolean skipAggregateCounters;
+
+ private LoadingCache<UUID, EntityManager> entityManagers =
+ CacheBuilder.newBuilder().maximumSize( 100 ).build( new CacheLoader<UUID, EntityManager>() {
+ public EntityManager load( UUID appId ) { // no checked exception
+ return _getEntityManager( appId );
+ }
+ } );
+
+
+ /**
+ * Must be constructed with a CassandraClientPool.
+ *
+ * @param cass the cassandraService instance
+ */
+ public EntityManagerFactoryImpl( CassandraService cass, CounterUtils counterUtils, boolean skipAggregateCounters ) {
+ this.cass = cass;
+ this.counterUtils = counterUtils;
+ this.skipAggregateCounters = skipAggregateCounters;
+ if ( skipAggregateCounters ) {
+ logger.warn( "NOTE: Counters have been disabled by configuration..." );
+ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.core.Datastore#getImpementationDescription()
+ */
+ @Override
+ public String getImpementationDescription() {
+ return IMPLEMENTATION_DESCRIPTION;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.core.Datastore#getEntityDao(java.util.UUID,
+ * java.util.UUID)
+ */
+ @Override
+ public EntityManager getEntityManager( UUID applicationId ) {
+ try {
+ return entityManagers.get( applicationId );
+ }
+ catch ( Exception ex ) {
+ ex.printStackTrace();
+ }
+ return _getEntityManager( applicationId );
+ }
+
+
+ private EntityManager _getEntityManager( UUID applicationId ) {
+ //EntityManagerImpl em = new EntityManagerImpl();
+ EntityManager em = applicationContext.getBean( "entityManager", EntityManager.class );
+ //em.init(this,cass,counterUtils,applicationId, skipAggregateCounters);
+ em.setApplicationId( applicationId );
+ return em;
+ }
+
+
+ public ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+
+
+ /**
+ * Gets the setup.
+ *
+ * @return Setup helper
+ */
+ public Setup getSetup() {
+ return new Setup( this, cass );
+ }
+
+
+ @Override
+ public void setup() throws Exception {
+ Setup setup = getSetup();
+
+ setup.setup();
+
+
+ if ( cass.getPropertiesMap() != null ) {
+ updateServiceProperties( cass.getPropertiesMap() );
+ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.core.Datastore#createApplication(java.lang.String)
+ */
+ @Override
+ public UUID createApplication( String organization, String name ) throws Exception {
+ return createApplication( organization, name, null );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.core.Datastore#createApplication(java.lang.String,
+ * java.util.Map)
+ */
+ @Override
+ public UUID createApplication( String organizationName, String name, Map<String, Object> properties )
+ throws Exception {
+
+ String appName = buildAppName( organizationName, name );
+
+ HColumn<String, ByteBuffer> column =
+ cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, appName, PROPERTY_UUID );
+ if ( column != null ) {
+ throw new ApplicationAlreadyExistsException( name );
+ // UUID uuid = uuid(column.getValue());
+ // return uuid;
+ }
+
+ UUID applicationId = UUIDUtils.newTimeUUID();
+ logger.info( "New application id " + applicationId.toString() );
+
+ initializeApplication( organizationName, applicationId, appName, properties );
+
+ return applicationId;
+ }
+
+
+ private String buildAppName( String organizationName, String name ) {
+ return StringUtils.lowerCase( name.contains( "/" ) ? name : organizationName + "/" + name );
+ }
+
+
+ public UUID initializeApplication( String organizationName, UUID applicationId, String name,
+ Map<String, Object> properties ) throws Exception {
+
+ String appName = buildAppName( organizationName, name );
+ // check for pre-existing
+ if ( lookupApplication( appName ) != null ) {
+ throw new ApplicationAlreadyExistsException( appName );
+ }
+ if ( properties == null ) {
+ properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
+ }
+
+ properties.put( PROPERTY_NAME, appName );
+
+ getSetup().setupApplicationKeyspace( applicationId, appName );
+
+
+ Keyspace ko = cass.getSystemKeyspace();
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+
+ long timestamp = cass.createTimestamp();
+
+ addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_UUID, applicationId, timestamp );
+ addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_NAME, appName, timestamp );
+
+ batchExecute( m, RETRY_COUNT );
+
+ EntityManager em = getEntityManager( applicationId );
+ em.create( TYPE_APPLICATION, APPLICATION_ENTITY_CLASS, properties );
+
+ em.resetRoles();
+
+ return applicationId;
+ }
+
+
+ @Override
+ public UUID importApplication( String organizationName, UUID applicationId, String name,
+ Map<String, Object> properties ) throws Exception {
+
+ name = buildAppName( organizationName, name );
+
+ HColumn<String, ByteBuffer> column =
+ cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
+ if ( column != null ) {
+ throw new ApplicationAlreadyExistsException( name );
+ // UUID uuid = uuid(column.getValue());
+ // return uuid;
+ }
+
+ return initializeApplication( organizationName, applicationId, name, properties );
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "EntityManagerFactory_lookupApplication_byName")
+ public UUID lookupApplication( String name ) throws Exception {
+ name = name.toLowerCase();
+ HColumn<String, ByteBuffer> column =
+ cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
+ if ( column != null ) {
+ UUID uuid = uuid( column.getValue() );
+ return uuid;
+ }
+ return null;
+ }
+
+
+ /**
+ * Gets the application.
+ *
+ * @param name the name
+ *
+ * @return application for name
+ *
+ * @throws Exception the exception
+ */
+ @Metered(group = "core", name = "EntityManagerFactory_getApplication")
+ public Application getApplication( String name ) throws Exception {
+ name = name.toLowerCase();
+ HColumn<String, ByteBuffer> column =
+ cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
+ if ( column == null ) {
+ return null;
+ }
+
+ UUID applicationId = uuid( column.getValue() );
+
+ EntityManager em = getEntityManager( applicationId );
+ return ( ( EntityManagerImpl ) em ).getEntity( applicationId, Application.class );
+ }
+
+
+ @Override
+ public Map<String, UUID> getApplications() throws Exception {
+ Map<String, UUID> applications = new TreeMap<String, UUID>( CASE_INSENSITIVE_ORDER );
+ Keyspace ko = cass.getSystemKeyspace();
+ RangeSlicesQuery<String, String, UUID> q = createRangeSlicesQuery( ko, se, se, ue );
+ q.setKeys( "", "\uFFFF" );
+ q.setColumnFamily( APPLICATIONS_CF );
+ q.setColumnNames( PROPERTY_UUID );
+ q.setRowCount( 10000 );
+ QueryResult<OrderedRows<String, String, UUID>> r = q.execute();
+ Rows<String, String, UUID> rows = r.get();
+ for ( Row<String, String, UUID> row : rows ) {
+ ColumnSlice<String, UUID> slice = row.getColumnSlice();
+ HColumn<String, UUID> column = slice.getColumnByName( PROPERTY_UUID );
+ applications.put( row.getKey(), column.getValue() );
+ }
+ return applications;
+ }
+
+
+ @Override
+ public boolean setServiceProperty( String name, String value ) {
+ try {
+ cass.setColumn( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, name, value );
+ return true;
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to set property " + name + ": " + e.getMessage() );
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean deleteServiceProperty( String name ) {
+ try {
+ cass.deleteColumn( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, name );
+ return true;
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to delete property " + name + ": " + e.getMessage() );
+ }
+ return false;
+ }
+
+
+ @Override
+ public boolean updateServiceProperties( Map<String, String> properties ) {
+ try {
+ cass.setColumns( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF.getBytes(), properties );
+ return true;
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to update properties: " + e.getMessage() );
+ }
+ return false;
+ }
+
+
+ @Override
+ public Map<String, String> getServiceProperties() {
+ Map<String, String> properties = null;
+ try {
+ properties = asMap( cass.getAllColumns( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, se, se ) );
+ return properties;
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to load properties: " + e.getMessage() );
+ }
+ return null;
+ }
+
+
+ @Override
+ public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+
+ public void setCounterUtils( CounterUtils counterUtils ) {
+ this.counterUtils = counterUtils;
+ }
+}