You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/30 16:21:09 UTC
[33/34] git commit: update to master
update to master
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9fec2baa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9fec2baa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9fec2baa
Branch: refs/pull/39/head
Commit: 9fec2baafe85ac97d3b16f4497166b1e79b87fe6
Parents: 3ccbfd8 33e0018
Author: Lewis John McGibbney <le...@apache.org>
Authored: Thu Jan 30 14:20:21 2014 +0000
Committer: Lewis John McGibbney <le...@apache.org>
Committed: Thu Jan 30 14:20:21 2014 +0000
----------------------------------------------------------------------
portal/Gruntfile.js | 40 +++++++--
portal/js/libs/usergrid.sdk.js | 4 +-
portal/tests/protractor/applications.spec.js | 6 +-
portal/tests/protractor/forgotPassword.spec.js | 5 ++
portal/tests/protractor/profile.spec.js | 14 ++--
portal/tests/protractor/users.spec.js | 5 +-
portal/tests/protractor/util.js | 34 ++++++--
portal/tests/protractorConf.js | 11 +--
.../persistence/cassandra/CassandraService.java | 7 +-
.../persistence/cassandra/QueryProcessor.java | 2 +-
.../cassandra/RelationManagerImpl.java | 22 ++---
.../cassandra/index/ConnectedIndexScanner.java | 77 +++++++++++++----
.../cassandra/index/IndexBucketScanner.java | 33 ++++++--
.../persistence/query/ir/SearchVisitor.java | 7 +-
.../query/ir/result/IntersectionIterator.java | 4 +-
.../query/ir/result/MergeIterator.java | 28 +++++--
.../query/ir/result/SliceIterator.java | 12 +--
.../query/ir/result/UnionIterator.java | 17 +++-
.../query/AllInConnectionNoTypeIT.java | 1 +
.../query/IntersectionUnionPagingIT.java | 25 ++++--
.../ir/result/IntersectionIteratorTest.java | 12 +--
.../query/ir/result/UnionIteratorTest.java | 88 ++++++++++++++++++++
stack/tools/pom.xml | 4 +
.../apache/usergrid/tools/EntityCleanup.java | 4 +-
.../usergrid/tools/UniqueIndexCleanup.java | 4 +-
.../apache/usergrid/tools/bean/ExportOrg.java | 11 +++
26 files changed, 363 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 1b2b9e3,0000000..d87ebd6
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@@ -1,1134 -1,0 +1,1137 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.connection.HConnectionManager;
+import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.cassandra.service.ThriftKsDef;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.ConsistencyLevelPolicy;
+import me.prettyprint.hector.api.HConsistencyLevel;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.ColumnQuery;
+import me.prettyprint.hector.api.query.CountQuery;
+import me.prettyprint.hector.api.query.MultigetSliceQuery;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import me.prettyprint.hector.api.query.SliceQuery;
+import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
+import static org.apache.commons.collections.MapUtils.getIntValue;
+import static org.apache.commons.collections.MapUtils.getString;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffers;
+import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
+import static org.apache.usergrid.utils.MapUtils.asMap;
+import static org.apache.usergrid.utils.MapUtils.filter;
+
+
+public class CassandraService {
+
+ public static String SYSTEM_KEYSPACE = "Usergrid";
+
+ public static String STATIC_APPLICATION_KEYSPACE = "Usergrid_Applications";
+
+ public static final boolean USE_VIRTUAL_KEYSPACES = true;
+
+ public static final String APPLICATIONS_CF = "Applications";
+ public static final String PROPERTIES_CF = "Properties";
+ public static final String TOKENS_CF = "Tokens";
+ public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
+
+ public static final int DEFAULT_COUNT = 1000;
+ public static final int ALL_COUNT = 100000;
+ public static final int INDEX_ENTRY_LIST_COUNT = 1000;
+ public static final int DEFAULT_SEARCH_COUNT = 10000;
+
+ public static final int RETRY_COUNT = 5;
+
+ public static final String DEFAULT_APPLICATION = "default-app";
+ public static final String DEFAULT_ORGANIZATION = "usergrid";
+ public static final String MANAGEMENT_APPLICATION = "management";
+
+ public static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
+ public static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
+
+ private static final Logger logger = LoggerFactory.getLogger( CassandraService.class );
+
+ private static final Logger db_logger =
+ LoggerFactory.getLogger( CassandraService.class.getPackage().getName() + ".DB" );
+
+ Cluster cluster;
+ CassandraHostConfigurator chc;
+ Properties properties;
+ LockManager lockManager;
+
+ ConsistencyLevelPolicy consistencyLevelPolicy;
+
+ private Keyspace systemKeyspace;
+
+ private Map<String, String> accessMap;
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final BytesArraySerializer bae = new BytesArraySerializer();
+ public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+ public static final LongSerializer le = new LongSerializer();
+
+ public static final UUID NULL_ID = new UUID( 0, 0 );
+
+
+ public CassandraService( Properties properties, Cluster cluster,
+ CassandraHostConfigurator cassandraHostConfigurator, LockManager lockManager ) {
+ this.properties = properties;
+ this.cluster = cluster;
+ chc = cassandraHostConfigurator;
+ this.lockManager = lockManager;
+ db_logger.info( "" + cluster.getKnownPoolHosts( false ) );
+ }
+
+
+ public void init() throws Exception {
+ if ( consistencyLevelPolicy == null ) {
+ consistencyLevelPolicy = new ConfigurableConsistencyLevel();
+ ( ( ConfigurableConsistencyLevel ) consistencyLevelPolicy )
+ .setDefaultReadConsistencyLevel( HConsistencyLevel.ONE );
+ }
+ accessMap = new HashMap<String, String>( 2 );
+ accessMap.put( "username", properties.getProperty( "cassandra.username" ) );
+ accessMap.put( "password", properties.getProperty( "cassandra.password" ) );
+ systemKeyspace =
+ HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
+ accessMap );
+ }
+
+
+ public Cluster getCluster() {
+ return cluster;
+ }
+
+
+ public void setCluster( Cluster cluster ) {
+ this.cluster = cluster;
+ }
+
+
+ public CassandraHostConfigurator getCassandraHostConfigurator() {
+ return chc;
+ }
+
+
+ public void setCassandraHostConfigurator( CassandraHostConfigurator chc ) {
+ this.chc = chc;
+ }
+
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+
+ public void setProperties( Properties properties ) {
+ this.properties = properties;
+ }
+
+
+ public Map<String, String> getPropertiesMap() {
+ if ( properties != null ) {
+ return asMap( properties );
+ }
+ return null;
+ }
+
+
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
+
+ public void setLockManager( LockManager lockManager ) {
+ this.lockManager = lockManager;
+ }
+
+
+ public ConsistencyLevelPolicy getConsistencyLevelPolicy() {
+ return consistencyLevelPolicy;
+ }
+
+
+ public void setConsistencyLevelPolicy( ConsistencyLevelPolicy consistencyLevelPolicy ) {
+ this.consistencyLevelPolicy = consistencyLevelPolicy;
+ }
+
+
+ /** @return keyspace for application UUID */
+ public static String keyspaceForApplication( UUID applicationId ) {
+ if ( USE_VIRTUAL_KEYSPACES ) {
+ return STATIC_APPLICATION_KEYSPACE;
+ }
+ else {
+ return "Application_" + applicationId.toString().replace( '-', '_' );
+ }
+ }
+
+
+ public static UUID prefixForApplication( UUID applicationId ) {
+ if ( USE_VIRTUAL_KEYSPACES ) {
+ return applicationId;
+ }
+ else {
+ return null;
+ }
+ }
+
+
+ public Keyspace getKeyspace( String keyspace, UUID prefix ) {
+ Keyspace ko = null;
+ if ( USE_VIRTUAL_KEYSPACES && ( prefix != null ) ) {
+ ko = createVirtualKeyspace( keyspace, prefix, ue, cluster, consistencyLevelPolicy,
+ ON_FAIL_TRY_ALL_AVAILABLE, accessMap );
+ }
+ else {
+ ko = HFactory.createKeyspace( keyspace, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
+ accessMap );
+ }
+ return ko;
+ }
+
+
+ public Keyspace getApplicationKeyspace( UUID applicationId ) {
+ assert applicationId != null;
+ Keyspace ko = getKeyspace( keyspaceForApplication( applicationId ), prefixForApplication( applicationId ) );
+ return ko;
+ }
+
+
+ /** The Usergrid_Applications keyspace directly */
+ public Keyspace getUsergridApplicationKeyspace() {
+ return getKeyspace( STATIC_APPLICATION_KEYSPACE, null );
+ }
+
+
+ public Keyspace getSystemKeyspace() {
+ return systemKeyspace;
+ }
+
+
+ public boolean checkKeyspacesExist() {
+ boolean exists = false;
+ try {
+ exists = cluster.describeKeyspace( SYSTEM_KEYSPACE ) != null
+ && cluster.describeKeyspace( STATIC_APPLICATION_KEYSPACE ) != null;
+ }
+ catch ( Exception ex ) {
+ logger.error( "could not describe keyspaces", ex );
+ }
+ return exists;
+ }
+
+
+ /**
+ * Lazy creates a column family in the keyspace. If it doesn't exist, it will be created, then the call will sleep
+ * until all nodes have acknowledged the schema change
+ */
+ public void createColumnFamily( String keyspace, ColumnFamilyDefinition cfDef ) {
+
+ if ( !keySpaceExists( keyspace ) ) {
+ createKeySpace( keyspace );
+ }
+
+
+ //add the cf
+
+ if ( !cfExists( keyspace, cfDef.getName() ) ) {
+
+ //default read repair chance to 0.1
+ cfDef.setReadRepairChance( 0.1d );
+
+ cluster.addColumnFamily( cfDef, true );
+ logger.info( "Created column family {} in keyspace {}", cfDef.getName(), keyspace );
+ }
+ }
+
+
+ /** Create the column families in the list */
+ public void createColumnFamilies( String keyspace, List<ColumnFamilyDefinition> cfDefs ) {
+ for ( ColumnFamilyDefinition cfDef : cfDefs ) {
+ createColumnFamily( keyspace, cfDef );
+ }
+ }
+
+
+ /** Check if the keyspace exsts */
+ public boolean keySpaceExists( String keyspace ) {
+ KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
+
+ return ksDef != null;
+ }
+
+
+ /** Create the keyspace */
+ private void createKeySpace( String keyspace ) {
+ logger.info( "Creating keyspace: {}", keyspace );
+
+ String strategy_class =
+ getString( properties, "cassandra.keyspace.strategy", "org.apache.cassandra.locator.SimpleStrategy" );
+ logger.info( "Using strategy: {}", strategy_class );
+
+ int replication_factor = getIntValue( properties, "cassandra.keyspace.replication", 1 );
+ logger.info( "Using replication (may be overriden by strategy options): {}", replication_factor );
+
+ // try {
+ ThriftKsDef ks_def = ( ThriftKsDef ) HFactory
+ .createKeyspaceDefinition( keyspace, strategy_class, replication_factor,
+ new ArrayList<ColumnFamilyDefinition>() );
+
+ @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, String> strategy_options =
+ filter( ( Map ) properties, "cassandra.keyspace.strategy.options.", true );
+ if ( strategy_options.size() > 0 ) {
+ logger.info( "Strategy options: {}", mapToFormattedJsonString( strategy_options ) );
+ ks_def.setStrategyOptions( strategy_options );
+ }
+
+ cluster.addKeyspace( ks_def );
+
+ waitForCreation( keyspace );
+
+ logger.info( "Created keyspace {}", keyspace );
+ }
+
+
+ /** Wait until all nodes agree on the same schema version */
+ private void waitForCreation( String keyspace ) {
+
+ while ( true ) {
+ Map<String, List<String>> versions = cluster.describeSchemaVersions();
+ // only 1 version, return
+ if ( versions != null && versions.size() == 1 ) {
+ return;
+ }
+ // sleep and try again
+ try {
+ Thread.sleep( 100 );
+ }
+ catch ( InterruptedException e ) {
+ }
+ }
+ }
+
+
+ /** Return true if the column family exists */
+ public boolean cfExists( String keyspace, String cfName ) {
+ KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
+
+ if ( ksDef == null ) {
+ return false;
+ }
+
+ for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
+ if ( cfName.equals( cf.getName() ) ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ *
+ * @return columns
+ *
+ * @throws Exception the exception
+ */
+ public <N, V> List<HColumn<N, V>> getAllColumns( Keyspace ko, Object columnFamily, Object key,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isInfoEnabled() ) {
+ db_logger.info( "getColumns cf={} key={}", columnFamily, key );
+ }
+
+ SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKey( bytebuffer( key ) );
+ q.setRange( null, null, false, ALL_COUNT );
+ QueryResult<ColumnSlice<N, V>> r = q.execute();
+ ColumnSlice<N, V> slice = r.get();
+ List<HColumn<N, V>> results = slice.getColumns();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned {} columns", results.size() );
+ }
+ }
+
+ return results;
+ }
+
+
+ public List<HColumn<String, ByteBuffer>> getAllColumns( Keyspace ko, Object columnFamily, Object key )
+ throws Exception {
+ return getAllColumns( ko, columnFamily, key, se, be );
+ }
+
+
+ public Set<String> getAllColumnNames( Keyspace ko, Object columnFamily, Object key ) throws Exception {
+ List<HColumn<String, ByteBuffer>> columns = getAllColumns( ko, columnFamily, key );
+ Set<String> set = new LinkedHashSet<String>();
+ for ( HColumn<String, ByteBuffer> column : columns ) {
+ set.add( column.getName() );
+ }
+ return set;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param start the start
+ * @param finish the finish
+ * @param count the count
+ * @param reversed the reversed
+ *
+ * @return columns
+ *
+ * @throws Exception the exception
+ */
+ public List<HColumn<ByteBuffer, ByteBuffer>> getColumns( Keyspace ko, Object columnFamily, Object key, Object start,
+ Object finish, int count, boolean reversed )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " start=" + start + " finish=" + finish
+ + " count=" + count + " reversed=" + reversed );
+ }
+
+ SliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createSliceQuery( ko, be, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKey( bytebuffer( key ) );
+
+ ByteBuffer start_bytes = null;
+ if ( start instanceof DynamicComposite ) {
+ start_bytes = ( ( DynamicComposite ) start ).serialize();
+ }
+ else if ( start instanceof List ) {
+ start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
+ }
+ else {
+ start_bytes = bytebuffer( start );
+ }
+
+ ByteBuffer finish_bytes = null;
+ if ( finish instanceof DynamicComposite ) {
+ finish_bytes = ( ( DynamicComposite ) finish ).serialize();
+ }
+ else if ( finish instanceof List ) {
+ finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
+ }
+ else {
+ finish_bytes = bytebuffer( finish );
+ }
+
+ /*
+ * if (reversed) { q.setRange(finish_bytes, start_bytes, reversed, count); }
+ * else { q.setRange(start_bytes, finish_bytes, reversed, count); }
+ */
+ q.setRange( start_bytes, finish_bytes, reversed, count );
+ QueryResult<ColumnSlice<ByteBuffer, ByteBuffer>> r = q.execute();
+ ColumnSlice<ByteBuffer, ByteBuffer> slice = r.get();
+ List<HColumn<ByteBuffer, ByteBuffer>> results = slice.getColumns();
+
+ if ( db_logger.isDebugEnabled() ) {
+ if ( results == null ) {
+ db_logger.debug( "getColumns returned null" );
+ }
+ else {
+ db_logger.debug( "getColumns returned " + results.size() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ public Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> multiGetColumns( Keyspace ko, Object columnFamily,
+ List<?> keys, Object start,
+ Object finish, int count,
+ boolean reversed ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "multiGetColumns cf=" + columnFamily + " keys=" + keys + " start=" + start + " finish="
+ + finish + " count=" + count + " reversed=" + reversed );
+ }
+
+ MultigetSliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createMultigetSliceQuery( ko, be, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( bytebuffers( keys ) );
+
+ ByteBuffer start_bytes = null;
+ if ( start instanceof DynamicComposite ) {
+ start_bytes = ( ( DynamicComposite ) start ).serialize();
+ }
+ else if ( start instanceof List ) {
+ start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
+ }
+ else {
+ start_bytes = bytebuffer( start );
+ }
+
+ ByteBuffer finish_bytes = null;
+ if ( finish instanceof DynamicComposite ) {
+ finish_bytes = ( ( DynamicComposite ) finish ).serialize();
+ }
+ else if ( finish instanceof List ) {
+ finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
+ }
+ else {
+ finish_bytes = bytebuffer( finish );
+ }
+
+ q.setRange( start_bytes, finish_bytes, reversed, count );
+ QueryResult<Rows<ByteBuffer, ByteBuffer, ByteBuffer>> r = q.execute();
+ Rows<ByteBuffer, ByteBuffer, ByteBuffer> rows = r.get();
+
+ Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> results =
+ new LinkedHashMap<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>>();
+ for ( Row<ByteBuffer, ByteBuffer, ByteBuffer> row : rows ) {
+ results.put( row.getKey(), row.getColumnSlice().getColumns() );
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param keys the keys
+ *
+ * @return map of keys to columns
+ *
+ * @throws Exception the exception
+ */
+ public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
+ Serializer<K> keySerializer, Serializer<N> nameSerializer,
+ Serializer<V> valueSerializer ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys );
+ }
+
+ MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( keys );
+ q.setRange( null, null, false, ALL_COUNT );
+ QueryResult<Rows<K, N, V>> r = q.execute();
+ Rows<K, N, V> results = r.get();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned " + results.getCount() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param columnNames the column names
+ *
+ * @return columns
+ *
+ * @throws Exception the exception
+ */
+ @SuppressWarnings("unchecked")
+ public <N, V> List<HColumn<N, V>> getColumns( Keyspace ko, Object columnFamily, Object key, Set<String> columnNames,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " names=" + columnNames );
+ }
+
+ SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKey( bytebuffer( key ) );
+ // q.setColumnNames(columnNames.toArray(new String[0]));
+ q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
+ .toArray() );
+
+ QueryResult<ColumnSlice<N, V>> r = q.execute();
+ ColumnSlice<N, V> slice = r.get();
+ List<HColumn<N, V>> results = slice.getColumns();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned " + results.size() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param keys the keys
+ * @param columnNames the column names
+ *
+ * @return map of keys to columns
+ *
+ * @throws Exception the exception
+ */
+ @SuppressWarnings("unchecked")
+ public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
+ Collection<String> columnNames, Serializer<K> keySerializer,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys + " names=" + columnNames );
+ }
+
+ MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( keys );
+ q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
+ .toArray() );
+ QueryResult<Rows<K, N, V>> r = q.execute();
+ Rows<K, N, V> results = r.get();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned " + results.getCount() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the column.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param column the column
+ *
+ * @return column
+ *
+ * @throws Exception the exception
+ */
+ public <N, V> HColumn<N, V> getColumn( Keyspace ko, Object columnFamily, Object key, N column,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + column );
+ }
+
+ /*
+ * ByteBuffer column_bytes = null; if (column instanceof List) {
+ * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
+ * { column_bytes = bytebuffer(column); }
+ */
+
+ ColumnQuery<ByteBuffer, N, V> q = HFactory.createColumnQuery( ko, be, nameSerializer, valueSerializer );
+ QueryResult<HColumn<N, V>> r =
+ q.setKey( bytebuffer( key ) ).setName( column ).setColumnFamily( columnFamily.toString() ).execute();
+ HColumn<N, V> result = r.get();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( result == null ) {
+ db_logger.info( "getColumn returned null" );
+ }
+ }
+
+ return result;
+ }
+
+
+ public <N, V> ColumnSlice<N, V> getColumns( Keyspace ko, Object columnFamily, Object key, N[] columns,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + columns );
+ }
+
+ /*
+ * ByteBuffer column_bytes = null; if (column instanceof List) {
+ * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
+ * { column_bytes = bytebuffer(column); }
+ */
+
+ SliceQuery<ByteBuffer, N, V> q = HFactory.createSliceQuery( ko, be, nameSerializer, valueSerializer );
+ QueryResult<ColumnSlice<N, V>> r =
+ q.setKey( bytebuffer( key ) ).setColumnNames( columns ).setColumnFamily( columnFamily.toString() )
+ .execute();
+ ColumnSlice<N, V> result = r.get();
+
+ if ( db_logger.isDebugEnabled() ) {
+ if ( result == null ) {
+ db_logger.debug( "getColumn returned null" );
+ }
+ }
+
+ return result;
+ }
+
+
+ public HColumn<String, ByteBuffer> getColumn( Keyspace ko, Object columnFamily, Object key, String column )
+ throws Exception {
+ return getColumn( ko, columnFamily, key, column, se, be );
+ }
+
+
+ public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue )
+ throws Exception {
+ this.setColumn( ko, columnFamily, key, columnName, columnValue, 0 );
+ }
+
+
+ public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue,
+ int ttl ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "setColumn cf=" + columnFamily + " key=" + key + " name=" + columnName + " value="
+ + columnValue );
+ }
+
+ ByteBuffer name_bytes = null;
+ if ( columnName instanceof List ) {
+ name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+ }
+ else {
+ name_bytes = bytebuffer( columnName );
+ }
+
+ ByteBuffer value_bytes = null;
+ if ( columnValue instanceof List ) {
+ value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
+ }
+ else {
+ value_bytes = bytebuffer( columnValue );
+ }
+
+ HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, be, be );
+ if ( ttl != 0 ) {
+ col.setTtl( ttl );
+ }
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+ m.insert( bytebuffer( key ), columnFamily.toString(), col );
+ }
+
+
+ /**
+ * Sets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param map the map
+ *
+ * @throws Exception the exception
+ */
+ public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map ) throws Exception {
+ this.setColumns( ko, columnFamily, key, map, 0 );
+ }
+
+
+ public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map, int ttl ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "setColumns cf=" + columnFamily + " key=" + key + " map=" + map + ( ttl != 0 ?
+ " ttl=" + ttl : "" ) );
+ }
+
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+ long timestamp = createTimestamp();
+
+ for ( Object name : map.keySet() ) {
+ Object value = map.get( name );
+ if ( value != null ) {
+
+ ByteBuffer name_bytes = null;
+ if ( name instanceof List ) {
+ name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) name );
+ }
+ else {
+ name_bytes = bytebuffer( name );
+ }
+
+ ByteBuffer value_bytes = null;
+ if ( value instanceof List ) {
+ value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) value );
+ }
+ else {
+ value_bytes = bytebuffer( value );
+ }
+
+ HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, timestamp, be, be );
+ if ( ttl != 0 ) {
+ col.setTtl( ttl );
+ }
+ m.addInsertion( bytebuffer( key ), columnFamily.toString(),
+ createColumn( name_bytes, value_bytes, timestamp, be, be ) );
+ }
+ }
+ batchExecute( m, CassandraService.RETRY_COUNT );
+ }
+
+
+ /**
+ * Create a timestamp based on the TimeResolution set to the cluster.
+ *
+ * @return a timestamp
+ */
+ public long createTimestamp() {
+ return chc.getClockResolution().createClock();
+ }
+
+
+ /**
+ * Delete column.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param column the column
+ *
+ * @throws Exception the exception
+ */
+ public void deleteColumn( Keyspace ko, Object columnFamily, Object key, Object column ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
+ }
+
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+ m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
+ }
+
+
+ /**
+ * Gets the row keys.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ *
+ * @return set of keys
+ *
+ * @throws Exception the exception
+ */
+ public <K> Set<K> getRowKeySet( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getRowKeys cf=" + columnFamily );
+ }
+
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( null, null );
+ q.setColumnNames( new ByteBuffer[0] );
+ QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
+ OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
+
+ Set<K> results = new LinkedHashSet<K>();
+ for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
+ results.add( row.getKey() );
+ }
+
+ if ( db_logger.isDebugEnabled() ) {
+ {
+ db_logger.debug( "getRowKeys returned " + results.size() + " rows" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the row keys as uui ds.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ *
+ * @return list of row key UUIDs
+ *
+ * @throws Exception the exception
+ */
+ public <K> List<K> getRowKeyList( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
+
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( null, null );
+ q.setColumnNames( new ByteBuffer[0] );
+ QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
+ OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
+
+ List<K> list = new ArrayList<K>();
+ for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
+ list.add( row.getKey() );
+ // K uuid = row.getKey();
+ // if (uuid != UUIDUtils.ZERO_UUID) {
+ // list.add(uuid);
+ // }
+ }
+
+ return list;
+ }
+
+
+ /**
+ * Delete row.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ *
+ * @throws Exception the exception
+ */
+ public void deleteRow( Keyspace ko, final Object columnFamily, final Object key ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
+ }
+
+ createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
+ }
+
+
+ public void deleteRow( Keyspace ko, final Object columnFamily, final String key ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
+ }
+
+ createMutator( ko, se ).addDeletion( key, columnFamily.toString() ).execute();
+ }
+
+
+ /**
+ * Delete row.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param timestamp the timestamp
+ *
+ * @throws Exception the exception
+ */
+ public void deleteRow( Keyspace ko, final Object columnFamily, final Object key, final long timestamp )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key + " timestamp=" + timestamp );
+ }
+
+ createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp ).execute();
+ }
+
+
+ /**
+ * Gets the id list.
+ *
+ * @param ko the keyspace
+ * @param key the key
+ * @param start the start
+ * @param finish the finish
+ * @param count the count
+ * @param reversed True if the scan should be reversed
+ * @param locator The index locator instance
+ * @param applicationId The applicationId
+ * @param collectionName The name of the collection to get the Ids for
+ *
+ * @return list of columns as UUIDs
+ *
+ * @throws Exception the exception
+ */
+ public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
- IndexBucketLocator locator, UUID applicationId, String collectionName )
++ IndexBucketLocator locator, UUID applicationId, String collectionName, boolean keepFirst )
+ throws Exception {
+
+ if ( count <= 0 ) {
+ count = DEFAULT_COUNT;
+ }
+
+ if ( NULL_ID.equals( start ) ) {
+ start = null;
+ }
+
++
++ final boolean skipFirst = start != null && !keepFirst;
++
+ IndexScanner scanner =
+ new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
- finish, reversed, count, collectionName );
++ finish, reversed, count, skipFirst, collectionName );
+
+ return scanner;
+ }
+
+
+ public int countColumns( Keyspace ko, Object columnFamily, Object key ) throws Exception {
+
+
+ CountQuery<ByteBuffer, ByteBuffer> cq = HFactory.createCountQuery( ko, be, be );
+ cq.setColumnFamily( columnFamily.toString() );
+ cq.setKey( bytebuffer( key ) );
+ cq.setRange( ByteBuffer.allocate( 0 ), ByteBuffer.allocate( 0 ), 100000000 );
+ QueryResult<Integer> r = cq.execute();
+ if ( r == null ) {
+ return 0;
+ }
+ return r.get();
+ }
+
+
+ /**
+ * Sets the id list.
+ *
+ * @param keyspace the keyspace
+ * @param targetId the target id
+ * @param columnFamily the column family
+ * @param keyPrefix the key prefix
+ * @param keySuffix the key suffix
+ * @param keyIds the key ids
+ * @param setColumnValue the set column value
+ *
+ * @throws Exception the exception
+ */
+ public void setIdList( Keyspace ko, UUID targetId, String keyPrefix, String keySuffix, List<UUID> keyIds )
+ throws Exception {
+ long timestamp = createTimestamp();
+ Mutator<ByteBuffer> batch = createMutator( ko, be );
+ batch = buildSetIdListMutator( batch, targetId, ENTITY_ID_SETS.toString(), keyPrefix, keySuffix, keyIds,
+ timestamp );
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+ }
+
+
+ boolean clusterUp = false;
+
+
+ public void startClusterHealthCheck() {
+
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleWithFixedDelay( new Runnable() {
+ @Override
+ public void run() {
+ if ( cluster != null ) {
+ HConnectionManager connectionManager = cluster.getConnectionManager();
+ if ( connectionManager != null ) {
+ clusterUp = !connectionManager.getHosts().isEmpty();
+ }
+ }
+ }
+ }, 1, 5, TimeUnit.SECONDS );
+ }
+
+ public void destroy() throws Exception {
+ if (cluster != null) {
+ HConnectionManager connectionManager = cluster.getConnectionManager();
+ if (connectionManager != null) {
+ connectionManager.shutdown();
+ }
+ }
+ cluster = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
index 2c43b4e,0000000..c5bddaf
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessor.java
@@@ -1,719 -1,0 +1,719 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Stack;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Identifier;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.Query.SortDirection;
+import org.apache.usergrid.persistence.Query.SortPredicate;
+import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.exceptions.NoFullTextIndexException;
+import org.apache.usergrid.persistence.exceptions.NoIndexException;
+import org.apache.usergrid.persistence.exceptions.PersistenceException;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.AndNode;
+import org.apache.usergrid.persistence.query.ir.EmailIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.NotNode;
+import org.apache.usergrid.persistence.query.ir.OrNode;
+import org.apache.usergrid.persistence.query.ir.OrderByNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.SliceNode;
+import org.apache.usergrid.persistence.query.ir.UuidIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+import org.apache.usergrid.persistence.query.ir.result.ResultIterator;
+import org.apache.usergrid.persistence.query.ir.result.ResultsLoader;
+import org.apache.usergrid.persistence.query.ir.result.ResultsLoaderFactory;
+import org.apache.usergrid.persistence.query.ir.result.ScanColumn;
+import org.apache.usergrid.persistence.query.tree.AndOperand;
+import org.apache.usergrid.persistence.query.tree.ContainsOperand;
+import org.apache.usergrid.persistence.query.tree.Equal;
+import org.apache.usergrid.persistence.query.tree.EqualityOperand;
+import org.apache.usergrid.persistence.query.tree.GreaterThan;
+import org.apache.usergrid.persistence.query.tree.GreaterThanEqual;
+import org.apache.usergrid.persistence.query.tree.LessThan;
+import org.apache.usergrid.persistence.query.tree.LessThanEqual;
+import org.apache.usergrid.persistence.query.tree.Literal;
+import org.apache.usergrid.persistence.query.tree.NotOperand;
+import org.apache.usergrid.persistence.query.tree.Operand;
+import org.apache.usergrid.persistence.query.tree.OrOperand;
+import org.apache.usergrid.persistence.query.tree.QueryVisitor;
+import org.apache.usergrid.persistence.query.tree.StringLiteral;
+import org.apache.usergrid.persistence.query.tree.WithinOperand;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+
+
+public class QueryProcessor {
+
- private static final int PAGE_SIZE = 1000;
++ public static final int PAGE_SIZE = 1000;
+ private static final Logger logger = LoggerFactory.getLogger( QueryProcessor.class );
+
+ private static final Schema SCHEMA = getDefaultSchema();
+
+ private final CollectionInfo collectionInfo;
+ private final EntityManager em;
+ private final ResultsLoaderFactory loaderFactory;
+
+ private Operand rootOperand;
+ private List<SortPredicate> sorts;
+ private CursorCache cursorCache;
+ private QueryNode rootNode;
+ private String entityType;
+
+ private int size;
+ private Query query;
+ private int pageSizeHint;
+
+
+ public QueryProcessor( Query query, CollectionInfo collectionInfo, EntityManager em,
+ ResultsLoaderFactory loaderFactory ) throws PersistenceException {
+ setQuery( query );
+ this.collectionInfo = collectionInfo;
+ this.em = em;
+ this.loaderFactory = loaderFactory;
+ process();
+ }
+
+
+ public Query getQuery() {
+ return query;
+ }
+
+
+ public void setQuery( Query query ) {
+ this.sorts = query.getSortPredicates();
+ this.cursorCache = new CursorCache( query.getCursor() );
+ this.rootOperand = query.getRootOperand();
+ this.entityType = query.getEntityType();
+ this.size = query.getLimit();
+ this.query = query;
+ }
+
+
+ public CollectionInfo getCollectionInfo() {
+ return collectionInfo;
+ }
+
+
+ private void process() throws PersistenceException {
+
+ int opCount = 0;
+
+ // no operand. Check for sorts
+ if ( rootOperand != null ) {
+ // visit the tree
+
+ TreeEvaluator visitor = new TreeEvaluator();
+
+ rootOperand.visit( visitor );
+
+ rootNode = visitor.getRootNode();
+
+ opCount = visitor.getSliceCount();
+ }
+
+ // see if we have sorts, if so, we can add them all as a single node at
+ // the root
+ if ( sorts.size() > 0 ) {
+
+ OrderByNode order = generateSorts( opCount );
+
+ opCount += order.getFirstPredicate().getAllSlices().size();
+
+ rootNode = order;
+ }
+
+
+ //if we still don't have a root node, no query nor order by was specified,
+ // just use the all node or the identifiers
+ if ( rootNode == null ) {
+
+
+ //a name alias or email alias was specified
+ if ( query.containsSingleNameOrEmailIdentifier() ) {
+
+ Identifier ident = query.getSingleIdentifier();
+
+ //an email was specified. An edge case that only applies to users. This is fulgy to put here,
+ // but required
+ if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
+ rootNode = new EmailIdentifierNode( ident );
+ }
+
+ //use the ident with the default alias. could be an email
+ else {
+ rootNode = new NameIdentifierNode( ident.getName() );
+ }
+ }
+ //a uuid was specified
+ else if ( query.containsSingleUuidIdentifier() ) {
+ rootNode = new UuidIdentifierNode( query.getSingleUuidIdentifier() );
+ }
+
+
+ //nothing was specified, order it by uuid
+ else {
+
+
+ //this is a bit ugly, but how we handle the start parameter
+ UUID startResult = query.getStartResult();
+
+ boolean startResultSet = startResult != null;
+
+ AllNode allNode = new AllNode( 0, startResultSet );
+
+ if ( startResultSet ) {
+ cursorCache.setNextCursor( allNode.getSlice().hashCode(),
+ UUIDSerializer.get().toByteBuffer( startResult ) );
+ }
+
+ rootNode = allNode;
+ }
+ }
+
+ if ( opCount > 1 ) {
+ pageSizeHint = PAGE_SIZE;
+ }
+ else {
+ pageSizeHint = Math.min( size, PAGE_SIZE );
+ }
+ }
+
+
+ public QueryNode getFirstNode() {
+ return rootNode;
+ }
+
+
+ /**
+ * Apply cursor position and sort order to this slice. This should only be invoke at evaluation time to ensure that
+ * the IR tree has already been fully constructed
+ */
+ public void applyCursorAndSort( QuerySlice slice ) {
+ // apply the sort first, since this can change the hash code
+ SortPredicate sort = getSort( slice.getPropertyName() );
+
+ if ( sort != null ) {
+ boolean isReversed = sort.getDirection() == SortDirection.DESCENDING;
+
+ //we're reversing the direction of this slice, reverse the params as well
+ if ( isReversed != slice.isReversed() ) {
+ slice.reverse();
+ }
+ }
+ // apply the cursor
+ ByteBuffer cursor = cursorCache.getCursorBytes( slice.hashCode() );
+
+ if ( cursor != null ) {
+ slice.setCursor( cursor );
+ }
+ }
+
+
+ /**
+ * Return the node id from the cursor cache
+ * @param nodeId
+ * @return
+ */
+ public ByteBuffer getCursorCache(int nodeId){
+ return cursorCache.getCursorBytes( nodeId );
+ }
+
+
+ private SortPredicate getSort( String propertyName ) {
+ for ( SortPredicate sort : sorts ) {
+ if ( sort.getPropertyName().equals( propertyName ) ) {
+ return sort;
+ }
+ }
+ return null;
+ }
+
+
+ /** Return the iterator results, ordered if required */
+ public Results getResults( SearchVisitor visitor ) throws Exception {
+ // if we have no order by just load the results
+
+ if ( rootNode == null ) {
+ return null;
+ }
+
+ rootNode.visit( visitor );
+
+ ResultIterator itr = visitor.getResults();
+
+ List<ScanColumn> entityIds = new ArrayList<ScanColumn>( Math.min( size, Query.MAX_LIMIT ) );
+
+ CursorCache resultsCursor = new CursorCache();
+
+ while ( entityIds.size() < size && itr.hasNext() ) {
+ entityIds.addAll( itr.next() );
+ }
+
+ //set our cursor, we paged through more entities than we want to return
+ if ( entityIds.size() > 0 ) {
+ int resultSize = Math.min( entityIds.size(), size );
+ entityIds = entityIds.subList( 0, resultSize );
+
+ if ( resultSize == size ) {
+ itr.finalizeCursor( resultsCursor, entityIds.get( resultSize - 1 ).getUUID() );
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Getting result for query: [{}], returning entityIds size: {}", getQuery(), entityIds.size());
+ }
+
+ final ResultsLoader loader = loaderFactory.getResultsLoader( em, query, query.getResultsLevel() );
+ final Results results = loader.getResults( entityIds );
+
+ if ( results == null ) {
+ return null;
+ }
+
+ // now we need to set the cursor from our tree evaluation for return
+ results.setCursor( resultsCursor.asString() );
+
+ results.setQuery( query );
+ results.setQueryProcessor( this );
+ results.setSearchVisitor( visitor );
+
+ return results;
+ }
+
+
+ private class TreeEvaluator implements QueryVisitor {
+
+ // stack for nodes that will be used to construct the tree and create
+ // objects
+ private CountingStack<QueryNode> nodes = new CountingStack<QueryNode>();
+
+
+ private int contextCount = -1;
+
+
+ /** Get the root node in our tree for runtime evaluation */
+ public QueryNode getRootNode() {
+ return nodes.peek();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.AndOperand)
+ */
+ @Override
+ public void visit( AndOperand op ) throws PersistenceException {
+
+ op.getLeft().visit( this );
+
+ QueryNode leftResult = nodes.peek();
+
+ op.getRight().visit( this );
+
+ QueryNode rightResult = nodes.peek();
+
+ // if the result of the left and right are the same, we don't want
+ // to create an AND. We'll use the same SliceNode. Do nothing
+ if ( leftResult == rightResult ) {
+ return;
+ }
+
+ // otherwise create a new AND node from the result of the visit
+
+ QueryNode right = nodes.pop();
+ QueryNode left = nodes.pop();
+
+ AndNode newNode = new AndNode( left, right );
+
+ nodes.push( newNode );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.OrOperand)
+ */
+ @Override
+ public void visit( OrOperand op ) throws PersistenceException {
+
+ // we need to create a new slicenode for the children of this
+ // operation
+
+ Operand left = op.getLeft();
+ Operand right = op.getRight();
+
+ // we only create a new slice node if our children are && and ||
+ // operations
+ createNewSlice( left );
+
+ left.visit( this );
+
+ // we only create a new slice node if our children are && and ||
+ // operations
+ createNewSlice( right );
+
+ right.visit( this );
+
+ QueryNode rightResult = nodes.pop();
+ QueryNode leftResult = nodes.pop();
+
+ // rewrite with the new Or operand
+ OrNode orNode = new OrNode( leftResult, rightResult, ++contextCount );
+
+ nodes.push( orNode );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.NotOperand)
+ */
+ @Override
+ public void visit( NotOperand op ) throws PersistenceException {
+
+ // create a new context since any child of NOT will need to be
+ // evaluated independently
+ Operand child = op.getOperation();
+ createNewSlice( child );
+ child.visit( this );
+
+ nodes.push( new NotNode( nodes.pop(), new AllNode( ++contextCount, false ) ) );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.ContainsOperand)
+ */
+ @Override
+ public void visit( ContainsOperand op ) throws NoFullTextIndexException {
+
+ String propertyName = op.getProperty().getValue();
+
+ if ( !SCHEMA.isPropertyFulltextIndexed( entityType, propertyName ) ) {
+ throw new NoFullTextIndexException( entityType, propertyName );
+ }
+
+ StringLiteral string = op.getString();
+
+ String indexName = op.getProperty().getIndexedValue();
+
+ SliceNode node = null;
+
+ // sdg - if left & right have same field name, we need to create a new
+ // slice
+ if ( !nodes.isEmpty() && nodes.peek() instanceof SliceNode
+ && ( ( SliceNode ) nodes.peek() ).getSlice( indexName ) != null ) {
+ node = newSliceNode();
+ }
+ else {
+ node = getUnionNode( op );
+ }
+
+ String fieldName = op.getProperty().getIndexedValue();
+
+ node.setStart( fieldName, string.getValue(), true );
+ node.setFinish( fieldName, string.getEndValue(), true );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.WithinOperand)
+ */
+ @Override
+ public void visit( WithinOperand op ) {
+
+ // change the property name to coordinates
+ nodes.push( new WithinNode( op.getProperty().getIndexedName(), op.getDistance().getFloatValue(),
+ op.getLattitude().getFloatValue(), op.getLongitude().getFloatValue(), ++contextCount ) );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.LessThan)
+ */
+ @Override
+ public void visit( LessThan op ) throws NoIndexException {
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), false );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.LessThanEqual)
+ */
+ @Override
+ public void visit( LessThanEqual op ) throws NoIndexException {
+
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setFinish( propertyName, op.getLiteral().getValue(), true );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.Equal)
+ */
+ @Override
+ public void visit( Equal op ) throws NoIndexException {
+ String fieldName = op.getProperty().getValue();
+
+ checkIndexed( fieldName );
+
+ Literal<?> literal = op.getLiteral();
+ SliceNode node = getUnionNode( op );
+
+ // this is an edge case. If we get more edge cases, we need to push
+ // this down into the literals and let the objects
+ // handle this
+ if ( literal instanceof StringLiteral ) {
+
+ StringLiteral stringLiteral = ( StringLiteral ) literal;
+
+ String endValue = stringLiteral.getEndValue();
+
+ if ( endValue != null ) {
+ node.setFinish( fieldName, endValue, true );
+ }
+ }
+ else {
+ node.setFinish( fieldName, literal.getValue(), true );
+ }
+
+ node.setStart( fieldName, literal.getValue(), true );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.GreaterThan)
+ */
+ @Override
+ public void visit( GreaterThan op ) throws NoIndexException {
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), false );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.persistence.query.tree.QueryVisitor#visit(org.apache.usergrid
+ * .persistence.query.tree.GreaterThanEqual)
+ */
+ @Override
+ public void visit( GreaterThanEqual op ) throws NoIndexException {
+ String propertyName = op.getProperty().getValue();
+
+ checkIndexed( propertyName );
+
+ getUnionNode( op ).setStart( propertyName, op.getLiteral().getValue(), true );
+ }
+
+
+ /**
+ * Return the current leaf node to add to if it exists. This means that we can compress multiple 'AND'
+ * operations and ranges into a single node. Otherwise a new node is created and pushed to the stack
+ *
+ * @param current The current operand node
+ */
+ private SliceNode getUnionNode( EqualityOperand current ) {
+
+ /**
+ * we only create a new slice node in 3 situations 1. No nodes exist 2.
+ * The parent node is not an AND node. Meaning we can't add this slice to
+ * the current set of slices 3. Our current top of stack is not a slice
+ * node.
+ */
+ // no nodes exist
+ if ( nodes.size() == 0 || !( nodes.peek() instanceof SliceNode ) ) {
+ return newSliceNode();
+ }
+
+ return ( SliceNode ) nodes.peek();
+ }
+
+
+ /** The new slice node */
+ private SliceNode newSliceNode() {
+ SliceNode sliceNode = new SliceNode( ++contextCount );
+
+ nodes.push( sliceNode );
+
+ return sliceNode;
+ }
+
+
+ /** Create a new slice if one will be required within the context of this node */
+ private void createNewSlice( Operand child ) {
+ if ( child instanceof EqualityOperand || child instanceof AndOperand || child instanceof ContainsOperand ) {
+ newSliceNode();
+ }
+ }
+
+
+ public int getSliceCount() {
+ return nodes.getSliceCount();
+ }
+ }
+
+
+ private static class CountingStack<T> extends Stack<T> {
+
+ private int count = 0;
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+
+ /* (non-Javadoc)
+ * @see java.util.Stack#pop()
+ */
+ @Override
+ public synchronized T pop() {
+ T entry = super.pop();
+
+ if ( entry instanceof SliceNode ) {
+ count += ( ( SliceNode ) entry ).getAllSlices().size();
+ }
+
+ return entry;
+ }
+
+
+ public int getSliceCount() {
+
+ Iterator<T> itr = this.iterator();
+
+ T entry;
+
+ while ( itr.hasNext() ) {
+ entry = itr.next();
+
+ if ( entry instanceof SliceNode ) {
+ count += ( ( SliceNode ) entry ).getAllSlices().size();
+ }
+ }
+
+ return count;
+ }
+ }
+
+
+ /** @return the pageSizeHint */
+ public int getPageSizeHint( QueryNode node ) {
+ /*****
+ * DO NOT REMOVE THIS PIECE OF CODE!!!!!!!!!!!
+ * It is crucial that the root iterator only needs the result set size per page
+ * otherwise our cursor logic will fail when passing cursor data to the leaf nodes
+ *******/
+ if(node == rootNode){
+ return size;
+ }
+
+ return pageSizeHint;
+ }
+
+
+ /** Generate a slice node with scan ranges for all the properties in our sort cache */
+ private OrderByNode generateSorts( int opCount ) throws NoIndexException {
+
+ // the value is irrelevant since we'll only ever have 1 slice node
+ // if this is called
+ SliceNode slice = new SliceNode( opCount );
+
+ SortPredicate first = sorts.get( 0 );
+
+ String propertyName = first.getPropertyName();
+
+ checkIndexed( propertyName );
+
+ slice.setStart( propertyName, null, true );
+ slice.setFinish( propertyName, null, true );
+
+
+ for ( int i = 1; i < sorts.size(); i++ ) {
+ checkIndexed( sorts.get( i ).getPropertyName() );
+ }
+
+
+ return new OrderByNode( slice, sorts.subList( 1, sorts.size() ), rootNode );
+ }
+
+
+ private void checkIndexed( String propertyName ) throws NoIndexException {
+
+ if ( propertyName == null || propertyName.isEmpty() || ( !SCHEMA.isPropertyIndexed( entityType, propertyName )
+ && collectionInfo != null ) ) {
+ throw new NoIndexException( entityType, propertyName );
+ }
+ }
+
+
+ public EntityManager getEntityManager() {
+ return em;
+ }
+}