You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:46 UTC
[36/96] [abbrv] [partial] Change package namespace to
org.apache.usergrid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityValueSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityValueSerializer.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityValueSerializer.java
new file mode 100644
index 0000000..49f991f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityValueSerializer.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import me.prettyprint.cassandra.serializers.AbstractSerializer;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+
+
+public class EntityValueSerializer extends AbstractSerializer<Object> {
+
+ @Override
+ public ByteBuffer toByteBuffer( Object obj ) {
+ ByteBuffer bytes = null;
+ if ( obj instanceof List ) {
+ bytes = DynamicComposite.toByteBuffer( ( List<?> ) obj );
+ }
+ else if ( obj instanceof Object[] ) {
+ bytes = DynamicComposite.toByteBuffer( ( Object[] ) obj );
+ }
+ else {
+ bytes = bytebuffer( obj );
+ }
+ return bytes;
+ }
+
+
+ @Override
+ public Object fromByteBuffer( ByteBuffer byteBuffer ) {
+ throw new IllegalStateException(
+ "The entity value serializer can only be used for data going to the database, " +
+ "and not data coming from the database" );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
new file mode 100644
index 0000000..bd470de
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
@@ -0,0 +1,322 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.usergrid.persistence.geo.EntityLocationRef;
+import org.apache.usergrid.persistence.geo.GeocellManager;
+import org.apache.usergrid.persistence.geo.model.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_GEOCELL;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.logBatchOperation;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+
+
+public class GeoIndexManager {
+
+ private static final Logger logger = LoggerFactory.getLogger( GeoIndexManager.class );
+
+ /**
+ * We only ever go down to max resolution of 9 because we use bucket hashing. Every level divides the region by
+ * 1/16. Our original "box" is 90 degrees by 45 degrees. We therefore have 90 * (1/16)^(r-1) and 45 * (1/16)^(r-1)
+ * for our size where r is the largest bucket resolution. This gives us a size of 90 deg => 0.0000000209547 deg =
+ * .2cm and 45 deg => 0.00000001047735 deg = .1 cm
+ */
+ public static final int MAX_RESOLUTION = 9;
+
+
+ EntityManagerImpl em;
+ CassandraService cass;
+
+
+ public GeoIndexManager() {
+ }
+
+
+ public GeoIndexManager init( EntityManagerImpl em ) {
+ this.em = em;
+ this.cass = em.getCass();
+ return this;
+ }
+
+
+ public static Mutator<ByteBuffer> addLocationEntryInsertionToMutator( Mutator<ByteBuffer> m, Object key,
+ EntityLocationRef entry ) {
+
+ DynamicComposite columnName = entry.getColumnName();
+ DynamicComposite columnValue = entry.getColumnValue();
+ long ts = entry.getTimestampInMicros();
+
+ logBatchOperation( "Insert", ENTITY_INDEX, key, columnName, columnValue, ts );
+
+ HColumn<ByteBuffer, ByteBuffer> column =
+ createColumn( columnName.serialize(), columnValue.serialize(), ts, ByteBufferSerializer.get(),
+ ByteBufferSerializer.get() );
+ m.addInsertion( bytebuffer( key ), ENTITY_INDEX.toString(), column );
+
+ return m;
+ }
+
+
+ private static Mutator<ByteBuffer> batchAddConnectionIndexEntries( Mutator<ByteBuffer> m,
+ IndexBucketLocator locator, UUID appId,
+ String propertyName, String geoCell,
+ UUID[] index_keys, ByteBuffer columnName,
+ ByteBuffer columnValue, long timestamp ) {
+
+ // entity_id,prop_name
+ Object property_index_key =
+ key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
+ locator.getBucket( appId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL], geoCell ) );
+
+ // entity_id,entity_type,prop_name
+ Object entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
+ geoCell,
+ locator.getBucket( appId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.BY_ENTITY_TYPE],
+ geoCell ) );
+
+ // entity_id,connection_type,prop_name
+ Object connection_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
+ DICTIONARY_GEOCELL, geoCell, locator.getBucket( appId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], geoCell ) );
+
+ // entity_id,connection_type,entity_type,prop_name
+ Object connection_type_and_entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
+ DICTIONARY_GEOCELL, geoCell, locator.getBucket( appId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], geoCell ) );
+
+ // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+ addInsertToMutator( m, ENTITY_INDEX, property_index_key, columnName, columnValue, timestamp );
+
+ // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+ addInsertToMutator( m, ENTITY_INDEX, entity_type_prop_index_key, columnName, columnValue, timestamp );
+
+ // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+ addInsertToMutator( m, ENTITY_INDEX, connection_type_prop_index_key, columnName, columnValue, timestamp );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ addInsertToMutator( m, ENTITY_INDEX, connection_type_and_entity_type_prop_index_key, columnName, columnValue,
+ timestamp );
+
+ return m;
+ }
+
+
+ public static void batchStoreLocationInConnectionsIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
+ UUID appId, UUID[] index_keys, String propertyName,
+ EntityLocationRef location ) {
+
+ Point p = location.getPoint();
+ List<String> cells = GeocellManager.generateGeoCell( p );
+
+ ByteBuffer columnName = location.getColumnName().serialize();
+ ByteBuffer columnValue = location.getColumnValue().serialize();
+ long ts = location.getTimestampInMicros();
+ for ( String cell : cells ) {
+ batchAddConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, columnValue,
+ ts );
+ }
+
+ logger.info( "Geocells to be saved for Point({} , {} ) are: {}", new Object[] {
+ location.getLatitude(), location.getLongitude(), cells
+ } );
+ }
+
+
+ private static Mutator<ByteBuffer> addLocationEntryDeletionToMutator( Mutator<ByteBuffer> m, Object key,
+ EntityLocationRef entry ) {
+
+ DynamicComposite columnName = entry.getColumnName();
+ long ts = entry.getTimestampInMicros();
+
+ logBatchOperation( "Delete", ENTITY_INDEX, key, columnName, null, ts );
+
+ m.addDeletion( bytebuffer( key ), ENTITY_INDEX.toString(), columnName.serialize(), ByteBufferSerializer.get(),
+ ts + 1 );
+
+ return m;
+ }
+
+
+ private static Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( Mutator<ByteBuffer> m,
+ IndexBucketLocator locator, UUID appId,
+ String propertyName, String geoCell,
+ UUID[] index_keys, ByteBuffer columnName,
+ long timestamp ) {
+
+ // entity_id,prop_name
+ Object property_index_key =
+ key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
+ locator.getBucket( appId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL], geoCell ) );
+
+ // entity_id,entity_type,prop_name
+ Object entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
+ geoCell,
+ locator.getBucket( appId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.BY_ENTITY_TYPE],
+ geoCell ) );
+
+ // entity_id,connection_type,prop_name
+ Object connection_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
+ DICTIONARY_GEOCELL, geoCell, locator.getBucket( appId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], geoCell ) );
+
+ // entity_id,connection_type,entity_type,prop_name
+ Object connection_type_and_entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
+ DICTIONARY_GEOCELL, geoCell, locator.getBucket( appId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], geoCell ) );
+
+ // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+ m.addDeletion( bytebuffer( property_index_key ), ENTITY_INDEX.toString(), columnName,
+ ByteBufferSerializer.get(), timestamp );
+
+ // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+ m.addDeletion( bytebuffer( entity_type_prop_index_key ), ENTITY_INDEX.toString(), columnName,
+ ByteBufferSerializer.get(), timestamp );
+
+ // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+ m.addDeletion( bytebuffer( connection_type_prop_index_key ), ENTITY_INDEX.toString(), columnName,
+ ByteBufferSerializer.get(), timestamp );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key ), ENTITY_INDEX.toString(),
+ columnName, ByteBufferSerializer.get(), timestamp );
+
+ return m;
+ }
+
+
+ public static void batchDeleteLocationInConnectionsIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
+ UUID appId, UUID[] index_keys, String propertyName,
+ EntityLocationRef location ) {
+
+ Point p = location.getPoint();
+ List<String> cells = GeocellManager.generateGeoCell( p );
+
+ ByteBuffer columnName = location.getColumnName().serialize();
+
+ long ts = location.getTimestampInMicros();
+
+ for ( String cell : cells ) {
+
+ batchDeleteConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, ts );
+ }
+
+ logger.info( "Geocells to be saved for Point({} , {} ) are: {}", new Object[] {
+ location.getLatitude(), location.getLongitude(), cells
+ } );
+ }
+
+
+ public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
+ UUID appId, Object key, UUID entityId,
+ EntityLocationRef location ) {
+
+ Point p = location.getPoint();
+ List<String> cells = GeocellManager.generateGeoCell( p );
+
+ for ( int i = 0; i < MAX_RESOLUTION; i++ ) {
+ String cell = cells.get( i );
+
+ String indexBucket = locator.getBucket( appId, IndexType.GEO, entityId, cell );
+
+ addLocationEntryInsertionToMutator( m, key( key, DICTIONARY_GEOCELL, cell, indexBucket ), location );
+ }
+
+ if ( logger.isInfoEnabled() ) {
+ logger.info( "Geocells to be saved for Point({},{}) are: {}", new Object[] {
+ location.getLatitude(), location.getLongitude(), cells
+ } );
+ }
+ }
+
+
+ public void storeLocationInCollectionIndex( EntityRef owner, String collectionName, UUID entityId,
+ String propertyName, EntityLocationRef location ) {
+
+ Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
+ Mutator<ByteBuffer> m = createMutator( ko, ByteBufferSerializer.get() );
+
+ batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
+ key( owner.getUuid(), collectionName, propertyName ), owner.getUuid(), location );
+
+ batchExecute( m, CassandraService.RETRY_COUNT );
+ }
+
+
+ public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
+ UUID appId, Object key, EntityLocationRef location ) {
+
+ Point p = location.getPoint();
+ List<String> cells = GeocellManager.generateGeoCell( p );
+
+ // delete for every bucket in every resolution
+ for ( int i = 0; i < MAX_RESOLUTION; i++ ) {
+
+ String cell = cells.get( i );
+
+ for ( String indexBucket : locator.getBuckets( appId, IndexType.GEO, cell ) ) {
+
+ addLocationEntryDeletionToMutator( m, key( key, DICTIONARY_GEOCELL, cell, indexBucket ), location );
+ }
+ }
+
+ if ( logger.isInfoEnabled() ) {
+ logger.info( "Geocells to be deleted for Point({},{}) are: {}", new Object[] {
+ location.getLatitude(), location.getLongitude(), cells
+ } );
+ }
+ }
+
+
+ public void removeLocationFromCollectionIndex( EntityRef owner, String collectionName, String propertyName,
+ EntityLocationRef location ) {
+
+ Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
+ Mutator<ByteBuffer> m = createMutator( ko, ByteBufferSerializer.get() );
+
+ batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
+ key( owner.getUuid(), collectionName, propertyName ), location );
+
+ batchExecute( m, CassandraService.RETRY_COUNT );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/IndexUpdate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/IndexUpdate.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/IndexUpdate.java
new file mode 100644
index 0000000..7e57f63
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/IndexUpdate.java
@@ -0,0 +1,448 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Entity;
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static java.nio.ByteBuffer.wrap;
+import static java.util.Arrays.asList;
+import static org.apache.usergrid.utils.JsonUtils.toJsonNode;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+
+
+public class IndexUpdate {
+
+ private static final Logger logger = LoggerFactory.getLogger( IndexUpdate.class );
+
+ public static final byte VALUE_CODE_BYTES = 0;
+ public static final byte VALUE_CODE_UTF8 = 1;
+ public static final byte VALUE_CODE_UUID = 2;
+ public static final byte VALUE_CODE_INT = 3;
+ public static final byte VALUE_CODE_MAX = 127;
+
+ public static int INDEX_STRING_VALUE_LENGTH = 1024;
+
+ private Mutator<ByteBuffer> batch;
+ private Entity entity;
+ private String entryName;
+ private Object entryValue;
+ private final List<IndexEntry> prevEntries = new ArrayList<IndexEntry>();
+ private final List<IndexEntry> newEntries = new ArrayList<IndexEntry>();
+ private final Set<String> indexesSet = new LinkedHashSet<String>();
+ private boolean schemaHasProperty;
+ private boolean isMultiValue;
+ private boolean removeListEntry;
+ private long timestamp;
+ private final UUID timestampUuid;
+ private UUID associatedId;
+
+
+ public IndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName, Object entryValue,
+ boolean schemaHasProperty, boolean isMultiValue, boolean removeListEntry, UUID timestampUuid ) {
+ this.batch = batch;
+ this.entity = entity;
+ this.entryName = entryName;
+ this.entryValue = entryValue;
+ this.schemaHasProperty = schemaHasProperty;
+ this.isMultiValue = isMultiValue;
+ this.removeListEntry = removeListEntry;
+ timestamp = getTimestampInMicros( timestampUuid );
+ this.timestampUuid = timestampUuid;
+ }
+
+
+ public Mutator<ByteBuffer> getBatch() {
+ return batch;
+ }
+
+
+ public void setBatch( Mutator<ByteBuffer> batch ) {
+ this.batch = batch;
+ }
+
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+
+ public void setEntity( Entity entity ) {
+ this.entity = entity;
+ }
+
+
+ public UUID getId() {
+ if ( associatedId != null ) {
+ return associatedId;
+ }
+ return entity.getUuid();
+ }
+
+
+ public String getEntryName() {
+ return entryName;
+ }
+
+
+ public void setEntryName( String entryName ) {
+ this.entryName = entryName;
+ }
+
+
+ public Object getEntryValue() {
+ return entryValue;
+ }
+
+
+ public void setEntryValue( Object entryValue ) {
+ this.entryValue = entryValue;
+ }
+
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+
+ public void setTimestamp( long timestamp ) {
+ this.timestamp = timestamp;
+ }
+
+
+ public UUID getTimestampUuid() {
+ return timestampUuid;
+ }
+
+
+ public List<IndexEntry> getPrevEntries() {
+ return prevEntries;
+ }
+
+
+ public void addPrevEntry( String path, Object value, UUID timestamp, ByteBuffer ledgerValue ) {
+ IndexEntry entry = new IndexEntry( path, value, timestamp, ledgerValue );
+ prevEntries.add( entry );
+ }
+
+
+ public List<IndexEntry> getNewEntries() {
+ return newEntries;
+ }
+
+
+ public void addNewEntry( String path, Object value ) {
+ IndexEntry entry = new IndexEntry( path, value, timestampUuid, null );
+ newEntries.add( entry );
+ }
+
+
+ public Set<String> getIndexesSet() {
+ return indexesSet;
+ }
+
+
+ public void addIndex( String index ) {
+ logger.debug( "Indexing {}", index );
+ indexesSet.add( index );
+ }
+
+
+ public boolean isSchemaHasProperty() {
+ return schemaHasProperty;
+ }
+
+
+ public void setSchemaHasProperty( boolean schemaHasProperty ) {
+ this.schemaHasProperty = schemaHasProperty;
+ }
+
+
+ public boolean isMultiValue() {
+ return isMultiValue;
+ }
+
+
+ public void setMultiValue( boolean isMultiValue ) {
+ this.isMultiValue = isMultiValue;
+ }
+
+
+ public boolean isRemoveListEntry() {
+ return removeListEntry;
+ }
+
+
+ public void setRemoveListEntry( boolean removeListEntry ) {
+ this.removeListEntry = removeListEntry;
+ }
+
+
+ public void setAssociatedId( UUID associatedId ) {
+ this.associatedId = associatedId;
+ }
+
+
+ public UUID getAssociatedId() {
+ return associatedId;
+ }
+
+
+ public class IndexEntry {
+ private final byte code;
+ private String path;
+ private final Object value;
+ private final UUID timestampUuid;
+ private final ByteBuffer ledgerColumn;
+
+
+ public IndexEntry( String path, Object value, UUID timestampUuid, ByteBuffer ledgerColumn ) {
+ this.path = path;
+ this.value = value;
+ code = indexValueCode( value );
+ this.timestampUuid = timestampUuid;
+ this.ledgerColumn = ledgerColumn;
+ }
+
+
+ public String getPath() {
+ return path;
+ }
+
+
+ public void setPath( String path ) {
+ this.path = path;
+ }
+
+
+ public Object getValue() {
+ return value;
+ }
+
+
+ public byte getValueCode() {
+ return code;
+ }
+
+
+ public UUID getTimestampUuid() {
+ return timestampUuid;
+ }
+
+
+ public DynamicComposite getIndexComposite() {
+ return new DynamicComposite( code, value, getId(), timestampUuid );
+ }
+
+
+ public DynamicComposite getIndexComposite( Object... ids ) {
+ return new DynamicComposite( code, value, asList( ids ), timestampUuid );
+ }
+
+
+ public ByteBuffer getLedgerColumn() {
+ return this.ledgerColumn;
+ }
+ }
+
+
+ public static class UniqueIndexEntry {
+ private final byte code;
+ private String path;
+ private final Object value;
+
+
+ public UniqueIndexEntry( String path, Object value ) {
+ this.path = path;
+ this.value = value;
+ code = indexValueCode( value );
+ }
+
+
+ public String getPath() {
+ return path;
+ }
+
+
+ public void setPath( String path ) {
+ this.path = path;
+ }
+
+
+ public Object getValue() {
+ return value;
+ }
+
+
+ public byte getValueCode() {
+ return code;
+ }
+
+
+ public DynamicComposite getIndexComposite() {
+ return new DynamicComposite( code, value );
+ }
+ }
+
+
+ private static String prepStringForIndex( String str ) {
+ str = str.trim().toLowerCase();
+ str = str.substring( 0, Math.min( INDEX_STRING_VALUE_LENGTH, str.length() ) );
+ return str;
+ }
+
+
+ /**
+ * @param obj
+ * @return
+ */
+ public static Object toIndexableValue( Object obj ) {
+ if ( obj == null ) {
+ return null;
+ }
+
+ if ( obj instanceof String ) {
+ return prepStringForIndex( ( String ) obj );
+ }
+
+ // UUIDs, and BigIntegers map to Cassandra UTF8Type and IntegerType
+ if ( ( obj instanceof UUID ) || ( obj instanceof BigInteger ) ) {
+ return obj;
+ }
+
+ // For any numeric values, turn them into a long
+ // and make them BigIntegers for IntegerType
+ if ( obj instanceof Number ) {
+ return BigInteger.valueOf( ( ( Number ) obj ).longValue() );
+ }
+
+ if ( obj instanceof Boolean ) {
+ return BigInteger.valueOf( ( ( Boolean ) obj ) ? 1L : 0L );
+ }
+
+ if ( obj instanceof Date ) {
+ return BigInteger.valueOf( ( ( Date ) obj ).getTime() );
+ }
+
+ if ( obj instanceof byte[] ) {
+ return wrap( ( byte[] ) obj );
+ }
+
+ if ( obj instanceof ByteBuffer ) {
+ return obj;
+ }
+
+ JsonNode json = toJsonNode( obj );
+ if ( ( json != null ) && json.isValueNode() ) {
+ if ( json.isBigInteger() ) {
+ return json.getBigIntegerValue();
+ }
+ else if ( json.isNumber() || json.isBoolean() ) {
+ return BigInteger.valueOf( json.getValueAsLong() );
+ }
+ else if ( json.isTextual() ) {
+ return prepStringForIndex( json.getTextValue() );
+ }
+ else if ( json.isBinary() ) {
+ try {
+ return wrap( json.getBinaryValue() );
+ }
+ catch ( IOException e ) {
+ }
+ }
+ }
+
+ return null;
+ }
+
+
+ public static boolean validIndexableValue( Object obj ) {
+ return toIndexableValue( obj ) != null;
+ }
+
+
+ public static boolean validIndexableValueOrJson( Object obj ) {
+ if ( ( obj instanceof Map ) || ( obj instanceof List ) || ( obj instanceof JsonNode ) ) {
+ return true;
+ }
+ return toIndexableValue( obj ) != null;
+ }
+
+
+ public static byte indexValueCode( Object obj ) {
+ obj = toIndexableValue( obj );
+ if ( obj instanceof String ) {
+ return VALUE_CODE_UTF8;
+ }
+ else if ( obj instanceof UUID ) {
+ return VALUE_CODE_UUID;
+ }
+ else if ( obj instanceof BigInteger ) {
+ return VALUE_CODE_INT;
+ }
+ else if ( obj instanceof Number ) {
+ return VALUE_CODE_INT;
+ }
+ else {
+ return VALUE_CODE_BYTES;
+ }
+ }
+
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static int compareIndexedValues( Object o1, Object o2 ) {
+ o1 = toIndexableValue( o1 );
+ o2 = toIndexableValue( o2 );
+ if ( ( o1 == null ) && ( o2 == null ) ) {
+ return 0;
+ }
+ else if ( o1 == null ) {
+ return -1;
+ }
+ else if ( o2 == null ) {
+ return 1;
+ }
+ int c1 = indexValueCode( o1 );
+ int c2 = indexValueCode( o2 );
+ if ( c1 == c2 ) {
+ if ( o1 instanceof UUID ) {
+ UUIDComparator.staticCompare( ( UUID ) o1, ( UUID ) o2 );
+ }
+ else if ( o1 instanceof Comparable ) {
+ return ( ( Comparable ) o1 ).compareTo( o2 );
+ }
+ }
+ return c1 - c2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
new file mode 100644
index 0000000..2c43b4e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@ -0,0 +1,719 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Stack;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Identifier;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.Query.SortDirection;
+import org.apache.usergrid.persistence.Query.SortPredicate;
+import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.exceptions.NoFullTextIndexException;
+import org.apache.usergrid.persistence.exceptions.NoIndexException;
+import org.apache.usergrid.persistence.exceptions.PersistenceException;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.AndNode;
+import org.apache.usergrid.persistence.query.ir.EmailIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.NotNode;
+import org.apache.usergrid.persistence.query.ir.OrNode;
+import org.apache.usergrid.persistence.query.ir.OrderByNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.SliceNode;
+import org.apache.usergrid.persistence.query.ir.UuidIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+import org.apache.usergrid.persistence.query.ir.result.ResultIterator;
+import org.apache.usergrid.persistence.query.ir.result.ResultsLoader;
+import org.apache.usergrid.persistence.query.ir.result.ResultsLoaderFactory;
+import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
+import org.apache.usergrid.persistence.query.tree.AndOperand;
+import org.apache.usergrid.persistence.query.tree.ContainsOperand;
+import org.apache.usergrid.persistence.query.tree.Equal;
+import org.apache.usergrid.persistence.query.tree.EqualityOperand;
+import org.apache.usergrid.persistence.query.tree.GreaterThan;
+import org.apache.usergrid.persistence.query.tree.GreaterThanEqual;
+import org.apache.usergrid.persistence.query.tree.LessThan;
+import org.apache.usergrid.persistence.query.tree.LessThanEqual;
+import org.apache.usergrid.persistence.query.tree.Literal;
+import org.apache.usergrid.persistence.query.tree.NotOperand;
+import org.apache.usergrid.persistence.query.tree.Operand;
+import org.apache.usergrid.persistence.query.tree.OrOperand;
+import org.apache.usergrid.persistence.query.tree.QueryVisitor;
+import org.apache.usergrid.persistence.query.tree.StringLiteral;
+import org.apache.usergrid.persistence.query.tree.WithinOperand;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+
+
+public class QueryProcessor {
+
+ private static final int PAGE_SIZE = 1000;
+ private static final Logger logger = LoggerFactory.getLogger( QueryProcessor.class );
+
+ private static final Schema SCHEMA = getDefaultSchema();
+
+ private final CollectionInfo collectionInfo;
+ private final EntityManager em;
+ private final ResultsLoaderFactory loaderFactory;
+
+ private Operand rootOperand;
+ private List<SortPredicate> sorts;
+ private CursorCache cursorCache;
+ private QueryNode rootNode;
+ private String entityType;
+
+ private int size;
+ private Query query;
+ private int pageSizeHint;
+
+
+ public QueryProcessor( Query query, CollectionInfo collectionInfo, EntityManager em,
+ ResultsLoaderFactory loaderFactory ) throws PersistenceException {
+ setQuery( query );
+ this.collectionInfo = collectionInfo;
+ this.em = em;
+ this.loaderFactory = loaderFactory;
+ process();
+ }
+
+
+ public Query getQuery() {
+ return query;
+ }
+
+
+ public void setQuery( Query query ) {
+ this.sorts = query.getSortPredicates();
+ this.cursorCache = new CursorCache( query.getCursor() );
+ this.rootOperand = query.getRootOperand();
+ this.entityType = query.getEntityType();
+ this.size = query.getLimit();
+ this.query = query;
+ }
+
+
+ public CollectionInfo getCollectionInfo() {
+ return collectionInfo;
+ }
+
+
+ private void process() throws PersistenceException {
+
+ int opCount = 0;
+
+ // no operand. Check for sorts
+ if ( rootOperand != null ) {
+ // visit the tree
+
+ TreeEvaluator visitor = new TreeEvaluator();
+
+ rootOperand.visit( visitor );
+
+ rootNode = visitor.getRootNode();
+
+ opCount = visitor.getSliceCount();
+ }
+
+ // see if we have sorts, if so, we can add them all as a single node at
+ // the root
+ if ( sorts.size() > 0 ) {
+
+ OrderByNode order = generateSorts( opCount );
+
+ opCount += order.getFirstPredicate().getAllSlices().size();
+
+ rootNode = order;
+ }
+
+
+ //if we still don't have a root node, no query nor order by was specified,
+ // just use the all node or the identifiers
+ if ( rootNode == null ) {
+
+
+ //a name alias or email alias was specified
+ if ( query.containsSingleNameOrEmailIdentifier() ) {
+
+ Identifier ident = query.getSingleIdentifier();
+
+ //an email was specified. An edge case that only applies to users. This is fulgy to put here,
+ // but required
+ if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
+ rootNode = new EmailIdentifierNode( ident );
+ }
+
+ //use the ident with the default alias. could be an email
+ else {
+ rootNode = new NameIdentifierNode( ident.getName() );
+ }
+ }
+ //a uuid was specified
+ else if ( query.containsSingleUuidIdentifier() ) {
+ rootNode = new UuidIdentifierNode( query.getSingleUuidIdentifier() );
+ }
+
+
+ //nothing was specified, order it by uuid
+ else {
+
+
+ //this is a bit ugly, but how we handle the start parameter
+ UUID startResult = query.getStartResult();
+
+ boolean startResultSet = startResult != null;
+
+ AllNode allNode = new AllNode( 0, startResultSet );
+
+ if ( startResultSet ) {
+ cursorCache.setNextCursor( allNode.getSlice().hashCode(),
+ UUIDSerializer.get().toByteBuffer( startResult ) );
+ }
+
+ rootNode = allNode;
+ }
+ }
+
+ if ( opCount > 1 ) {
+ pageSizeHint = PAGE_SIZE;
+ }
+ else {
+ pageSizeHint = Math.min( size, PAGE_SIZE );
+ }
+ }
+
+
+ public QueryNode getFirstNode() {
+ return rootNode;
+ }
+
+
+ /**
+ * Apply cursor position and sort order to this slice. This should only be invoke at evaluation time to ensure that
+ * the IR tree has already been fully constructed
+ */
+ public void applyCursorAndSort( QuerySlice slice ) {
+ // apply the sort first, since this can change the hash code
+ SortPredicate sort = getSort( slice.getPropertyName() );
+
+ if ( sort != null ) {
+ boolean isReversed = sort.getDirection() == SortDirection.DESCENDING;
+
+ //we're reversing the direction of this slice, reverse the params as well
+ if ( isReversed != slice.isReversed() ) {
+ slice.reverse();
+ }
+ }
+ // apply the cursor
+ ByteBuffer cursor = cursorCache.getCursorBytes( slice.hashCode() );
+
+ if ( cursor != null ) {
+ slice.setCursor( cursor );
+ }
+ }
+
+
+ /**
+ * Return the node id from the cursor cache
+ * @param nodeId
+ * @return
+ */
+ public ByteBuffer getCursorCache(int nodeId){
+ return cursorCache.getCursorBytes( nodeId );
+ }
+
+
+ private SortPredicate getSort( String propertyName ) {
+ for ( SortPredicate sort : sorts ) {
+ if ( sort.getPropertyName().equals( propertyName ) ) {
+ return sort;
+ }
+ }
+ return null;
+ }
+
+
+ /** Return the iterator results, ordered if required */
+ public Results getResults( SearchVisitor visitor ) throws Exception {
+ // if we have no order by just load the results
+
+ if ( rootNode == null ) {
+ return null;
+ }
+
+ rootNode.visit( visitor );
+
+ ResultIterator itr = visitor.getResults();
+
+ List<ScanColumn> entityIds = new ArrayList<ScanColumn>( Math.min( size, Query.MAX_LIMIT ) );
+
+ CursorCache resultsCursor = new CursorCache();
+
+ while ( entityIds.size() < size && itr.hasNext() ) {
+ entityIds.addAll( itr.next() );
+ }
+
+ //set our cursor, we paged through more entities than we want to return
+ if ( entityIds.size() > 0 ) {
+ int resultSize = Math.min( entityIds.size(), size );
+ entityIds = entityIds.subList( 0, resultSize );
+
+ if ( resultSize == size ) {
+ itr.finalizeCursor( resultsCursor, entityIds.get( resultSize - 1 ).getUUID() );
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Getting result for query: [{}], returning entityIds size: {}", getQuery(), entityIds.size());
+ }
+
+ final ResultsLoader loader = loaderFactory.getResultsLoader( em, query, query.getResultsLevel() );
+ final Results results = loader.getResults( entityIds );
+
+ if ( results == null ) {
+ return null;
+ }
+
+ // now we need to set the cursor from our tree evaluation for return
+ results.setCursor( resultsCursor.asString() );
+
+ results.setQuery( query );
+ results.setQueryProcessor( this );
+ results.setSearchVisitor( visitor );
+
+ return results;
+ }
+
+
+ private class TreeEvaluator implements QueryVisitor {
+
+ // stack for nodes that will be used to construct the tree and create
+ // objects
+ private CountingStack<QueryNode> nodes = new CountingStack<QueryNode>();
+
+
+ private int contextCount = -1;
+
+
+ /** Get the root node in our tree for runtime evaluation */
+ public QueryNode getRootNode() {
+ return nodes.peek();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.AndOperand)
+ */
+ @Override
+ public void visit( AndOperand op ) throws PersistenceException {
+
+ op.getLeft().visit( this );
+
+ QueryNode leftResult = nodes.peek();
+
+ op.getRight().visit( this );
+
+ QueryNode rightResult = nodes.peek();
+
+ // if the result of the left and right are the same, we don't want
+ // to create an AND. We'll use the same SliceNode. Do nothing
+ if ( leftResult == rightResult ) {
+ return;
+ }
+
+ // otherwise create a new AND node from the result of the visit
+
+ QueryNode right = nodes.pop();
+ QueryNode left = nodes.pop();
+
+ AndNode newNode = new AndNode( left, right );
+
+ nodes.push( newNode );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.OrOperand)
+ */
+ @Override
+ public void visit( OrOperand op ) throws PersistenceException {
+
+ // we need to create a new slicenode for the children of this
+ // operation
+
+ Operand left = op.getLeft();
+ Operand right = op.getRight();
+
+ // we only create a new slice node if our children are && and ||
+ // operations
+ createNewSlice( left );
+
+ left.visit( this );
+
+ // we only create a new slice node if our children are && and ||
+ // operations
+ createNewSlice( right );
+
+ right.visit( this );
+
+ QueryNode rightResult = nodes.pop();
+ QueryNode leftResult = nodes.pop();
+
+ // rewrite with the new Or operand
+ OrNode orNode = new OrNode( leftResult, rightResult, ++contextCount );
+
+ nodes.push( orNode );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.NotOperand)
+ */
+ @Override
+ public void visit( NotOperand op ) throws PersistenceException {
+
+ // create a new context since any child of NOT will need to be
+ // evaluated independently
+ Operand child = op.getOperation();
+ createNewSlice( child );
+ child.visit( this );
+
+ nodes.push( new NotNode( nodes.pop(), new AllNode( ++contextCount, false ) ) );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.ContainsOperand)
+ */
+ @Override
+ public void visit( ContainsOperand op ) throws NoFullTextIndexException {
+
+ String propertyName = op.getProperty().getValue();
+
+ if ( !SCHEMA.isPropertyFulltextIndexed( entityType, propertyName ) ) {
+ throw new NoFullTextIndexException( entityType, propertyName );
+ }
+
+ StringLiteral string = op.getString();
+
+ String indexName = op.getProperty().getIndexedValue();
+
+ SliceNode node = null;
+
+ // sdg - if left & right have same field name, we need to create a new
+ // slice
+ if ( !nodes.isEmpty() && nodes.peek() instanceof SliceNode
+ && ( ( SliceNode ) nodes.peek() ).getSlice( indexName ) != null ) {
+ node = newSliceNode();
+ }
+ else {
+ node = getUnionNode( op );
+ }
+
+ String fieldName = op.getProperty().getIndexedValue();
+
+ node.setStart( fieldName, string.getValue(), true );
+ node.setFinish( fieldName, string.getEndValue(), true );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.WithinOperand)
+ */
+ @Override
+ public void visit( WithinOperand op ) {
+
+ // change the property name to coordinates
+ nodes.push( new WithinNode( op.getProperty().getIndexedName(), op.getDistance().getFloatValue(),
+ op.getLattitude().getFloatValue(), op.getLongitude().getFloatValue(), ++contextCount ) );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.LessThan)
+ */
+ @Override
+ public void visit( LessThan op ) throws NoIndexException {
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), false );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.LessThanEqual)
+ */
+ @Override
+ public void visit( LessThanEqual op ) throws NoIndexException {
+
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), true );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.Equal)
+ */
+ @Override
+ public void visit( Equal op ) throws NoIndexException {
+ String fieldName = op.getProperty().getValue();
+
+ checkIndexed( fieldName );
+
+ Literal<?> literal = op.getLiteral();
+ SliceNode node = getUnionNode( op );
+
+ // this is an edge case. If we get more edge cases, we need to push
+ // this down into the literals and let the objects
+ // handle this
+ if ( literal instanceof StringLiteral ) {
+
+ StringLiteral stringLiteral = ( StringLiteral ) literal;
+
+ String endValue = stringLiteral.getEndValue();
+
+ if ( endValue != null ) {
+ node.setFinish( fieldName, endValue, true );
+ }
+ }
+ else {
+ node.setFinish( fieldName, literal.getValue(), true );
+ }
+
+ node.setStart( fieldName, literal.getValue(), true );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.GreaterThan)
+ */
+ @Override
+ public void visit( GreaterThan op ) throws NoIndexException {
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), false );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.GreaterThanEqual)
+ */
+ @Override
+ public void visit( GreaterThanEqual op ) throws NoIndexException {
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), true );
+ }
+
+
+ /**
+ * Return the current leaf node to add to if it exists. This means that we can compress multiple 'AND'
+ * operations and ranges into a single node. Otherwise a new node is created and pushed to the stack
+ *
+ * @param current The current operand node
+ */
+ private SliceNode getUnionNode( EqualityOperand current ) {
+
+ /**
+ * we only create a new slice node in 3 situations 1. No nodes exist 2.
+ * The parent node is not an AND node. Meaning we can't add this slice to
+ * the current set of slices 3. Our current top of stack is not a slice
+ * node.
+ */
+ // no nodes exist
+ if ( nodes.size() == 0 || !( nodes.peek() instanceof SliceNode ) ) {
+ return newSliceNode();
+ }
+
+ return ( SliceNode ) nodes.peek();
+ }
+
+
+ /** The new slice node */
+ private SliceNode newSliceNode() {
+ SliceNode sliceNode = new SliceNode( ++contextCount );
+
+ nodes.push( sliceNode );
+
+ return sliceNode;
+ }
+
+
+ /** Create a new slice if one will be required within the context of this node */
+ private void createNewSlice( Operand child ) {
+ if ( child instanceof EqualityOperand || child instanceof AndOperand || child instanceof ContainsOperand ) {
+ newSliceNode();
+ }
+ }
+
+
+ public int getSliceCount() {
+ return nodes.getSliceCount();
+ }
+ }
+
+
+ private static class CountingStack<T> extends Stack<T> {
+
+ private int count = 0;
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+
+ /* (non-Javadoc)
+ * @see java.util.Stack#pop()
+ */
+ @Override
+ public synchronized T pop() {
+ T entry = super.pop();
+
+ if ( entry instanceof SliceNode ) {
+ count += ( ( SliceNode ) entry ).getAllSlices().size();
+ }
+
+ return entry;
+ }
+
+
+ public int getSliceCount() {
+
+ Iterator<T> itr = this.iterator();
+
+ T entry;
+
+ while ( itr.hasNext() ) {
+ entry = itr.next();
+
+ if ( entry instanceof SliceNode ) {
+ count += ( ( SliceNode ) entry ).getAllSlices().size();
+ }
+ }
+
+ return count;
+ }
+ }
+
+
+ /** @return the pageSizeHint */
+ public int getPageSizeHint( QueryNode node ) {
+ /*****
+ * DO NOT REMOVE THIS PIECE OF CODE!!!!!!!!!!!
+ * It is crucial that the root iterator only needs the result set size per page
+ * otherwise our cursor logic will fail when passing cursor data to the leaf nodes
+ *******/
+ if(node == rootNode){
+ return size;
+ }
+
+ return pageSizeHint;
+ }
+
+
+ /** Generate a slice node with scan ranges for all the properties in our sort cache */
+ private OrderByNode generateSorts( int opCount ) throws NoIndexException {
+
+ // the value is irrelevant since we'll only ever have 1 slice node
+ // if this is called
+ SliceNode slice = new SliceNode( opCount );
+
+ SortPredicate first = sorts.get( 0 );
+
+ String propertyName = first.getPropertyName();
+
+ checkIndexed( propertyName );
+
+ slice.setStart( propertyName, null, true );
+ slice.setFinish( propertyName, null, true );
+
+
+ for ( int i = 1; i < sorts.size(); i++ ) {
+ checkIndexed( sorts.get( i ).getPropertyName() );
+ }
+
+
+ return new OrderByNode( slice, sorts.subList( 1, sorts.size() ), rootNode );
+ }
+
+
+ private void checkIndexed( String propertyName ) throws NoIndexException {
+
+ if ( propertyName == null || propertyName.isEmpty() || ( !SCHEMA.isPropertyIndexed( entityType, propertyName )
+ && collectionInfo != null ) ) {
+ throw new NoIndexException( entityType, propertyName );
+ }
+ }
+
+
+ public EntityManager getEntityManager() {
+ return em;
+ }
+}