You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/19 00:51:01 UTC

usergrid git commit: Enhance Map_Keys and give the ability to properly page the keys of a map.

Repository: usergrid
Updated Branches:
  refs/heads/master ceb50ff45 -> fe197af23


Enhance Map_Keys and give the ability to properly page the keys of a map.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fe197af2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fe197af2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fe197af2

Branch: refs/heads/master
Commit: fe197af239445349ca405ec7a63621c49d34fa65
Parents: ceb50ff
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Aug 18 17:50:30 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Aug 18 17:50:30 2016 -0700

----------------------------------------------------------------------
 .../core/migration/util/AstayanxUtils.java      |  49 -------
 .../usergrid/persistence/map/MapKeyResults.java |  42 ++++++
 .../usergrid/persistence/map/MapManager.java    |   7 +
 .../persistence/map/impl/MapManagerImpl.java    |   8 ++
 .../persistence/map/impl/MapSerialization.java  |  11 ++
 .../map/impl/MapSerializationImpl.java          |  84 ++++++++---
 .../persistence/map/MapManagerTest.java         | 139 ++++++++++++++++++-
 7 files changed, 268 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
deleted file mode 100644
index 7ae4748..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.migration.util;
-
-
-import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
-
-
-public class AstayanxUtils {
-
-    /**
-     * Return true if the exception is an instance of a missing keysapce
-     * @param rethrowMessage The message to add to the exception if rethrown
-     * @param cassandraException The exception from cassandar
-     * @return
-     */
-    public static void isKeyspaceMissing(final String rethrowMessage,  final Exception cassandraException ) {
-
-        if ( cassandraException instanceof BadRequestException ) {
-
-            //check if it's b/c the keyspace is missing, if so
-            final String message = cassandraException.getMessage();
-
-            //no op, just swallow
-            if(message.contains( "why:Keyspace" ) && message.contains( "does not exist" )){
-                return;
-            };
-        }
-
-       throw new RuntimeException( rethrowMessage, cassandraException );
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java
new file mode 100644
index 0000000..0d34856
--- /dev/null
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapKeyResults.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.map;
+
+import java.util.List;
+
+
+public class MapKeyResults {
+
+    private String cursor;
+    private List<String> keys;
+
+    public MapKeyResults(final String cursor, final List<String> keys){
+
+        this.cursor = cursor;
+        this.keys = keys;
+
+    }
+
+    public String getCursor() {
+        return cursor;
+    }
+
+    public List<String> getKeys() {
+        return keys;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 80e2d17..5fffaca 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -90,4 +90,11 @@ public interface MapManager {
      * @param key The key used to delete the entry
      */
     void delete( final String key );
+
+    /**
+     * Return a page of keys that exist within the map
+     * @param cursor
+     * @param limit
+     */
+    MapKeyResults getKeys(final String cursor, final int limit);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index cafa4a1..86e4001 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -19,9 +19,11 @@ package org.apache.usergrid.persistence.map.impl;
 
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.map.MapKeyResults;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
 
@@ -110,6 +112,12 @@ public class MapManagerImpl implements MapManager {
         mapSerialization.delete(scope,key);
     }
 
+    @Override
+    public MapKeyResults getKeys(final String cursor, final int limit){
+        return mapSerialization.getAllKeys(scope, cursor, limit);
+    }
+
+
 
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index e9c21d2..a60f5ac 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -21,10 +21,12 @@ package org.apache.usergrid.persistence.map.impl;
 
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.map.MapKeyResults;
 import org.apache.usergrid.persistence.map.MapScope;
 
 
@@ -87,4 +89,13 @@ public interface MapSerialization extends Migration {
      * @param key The key used to delete the entry
      */
     void delete( final MapScope scope, final String key );
+
+    /**
+     * Get a list of keys for the given map scope.
+     * @param cursor Optional pagingState
+     * @param limit number of keys to return
+     * @return List of keys
+     */
+    MapKeyResults getAllKeys(final MapScope mapScope, final String cursor, final int limit);
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 735f2b8..282974c 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
 import org.apache.usergrid.persistence.core.shard.StringHashUtils;
+import org.apache.usergrid.persistence.map.MapKeyResults;
 import org.apache.usergrid.persistence.map.MapScope;
 
 import com.google.common.base.Preconditions;
@@ -41,6 +42,8 @@ import com.google.common.hash.Funnel;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import static org.apache.commons.lang.StringUtils.isBlank;
+
 
 @Singleton
 public class MapSerializationImpl implements MapSerialization {
@@ -67,7 +70,7 @@ public class MapSerializationImpl implements MapSerialization {
             put( "column1", DataType.Name.BLOB );
             put( "value", DataType.Name.BLOB ); }};
     private static final Map<String, String> MAP_KEYS_CLUSTERING_ORDER =
-        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+        new HashMap<String, String>(){{ put( "column1", "DESC" ); }};
 
 
 
@@ -162,12 +165,12 @@ public class MapSerializationImpl implements MapSerialization {
                 .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
 
 
-            final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+            final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
             mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
                 .using(timeToLive)
-                .value("key", getMapKeyPartitionKey(scope, key, bucket))
-                .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
-                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
+                .value("key", getMapKeyPartitionKey(scope, bucket))
+                .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
+                .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED));
         }else{
 
             mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
@@ -176,12 +179,12 @@ public class MapSerializationImpl implements MapSerialization {
                 .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
 
             // get a bucket number for the map keys table
-            final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+            final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
 
             mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
-                .value("key", getMapKeyPartitionKey(scope, key, bucket))
-                .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
-                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
+                .value("key", getMapKeyPartitionKey(scope, bucket))
+                .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
+                .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED));
 
         }
 
@@ -216,10 +219,10 @@ public class MapSerializationImpl implements MapSerialization {
         session.execute(mapEntry);
 
 
-        final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+        final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
         Statement mapKey;
         mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
-            .value("key", getMapKeyPartitionKey(scope, key, bucket))
+            .value("key", getMapKeyPartitionKey(scope, bucket))
             .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
             .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
 
@@ -253,12 +256,12 @@ public class MapSerializationImpl implements MapSerialization {
         session.execute(mapEntry);
 
 
-        final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+        final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
         Statement mapKey;
         mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
-            .value("key", getMapKeyPartitionKey(scope, key, bucket))
+            .value("key", getMapKeyPartitionKey(scope, bucket))
             .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
-            .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
+            .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED));
 
         session.execute(mapKey);
     }
@@ -276,16 +279,17 @@ public class MapSerializationImpl implements MapSerialization {
 
 
         // not sure which bucket the value is in, execute a delete against them all
-        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( scope.getName() );
         List<ByteBuffer> mapKeys = new ArrayList<>();
         for( int bucket :  buckets){
-            mapKeys.add( getMapKeyPartitionKey(scope, key, bucket));
+            mapKeys.add( getMapKeyPartitionKey(scope, bucket));
         }
 
         Statement deleteMapKey;
         Clause inKey = QueryBuilder.in("key", mapKeys);
+        Clause column1Equals = QueryBuilder.eq("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED));
         deleteMapKey = QueryBuilder.delete().from(MAP_KEYS_TABLE)
-            .where(inKey);
+            .where(inKey).and(column1Equals);
         session.execute(deleteMapKey);
 
 
@@ -318,6 +322,48 @@ public class MapSerializationImpl implements MapSerialization {
 
     }
 
+    @Override
+    public MapKeyResults getAllKeys(final MapScope scope, final String cursor, final int limit ){
+
+        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( scope.getName() );
+        final List<ByteBuffer> partitionKeys = new ArrayList<>(NUM_BUCKETS.length);
+
+        for (int bucket : buckets) {
+
+            partitionKeys.add(getMapKeyPartitionKey(scope, bucket));
+        }
+
+        Clause in = QueryBuilder.in("key", partitionKeys);
+
+        Statement statement;
+        if( isBlank(cursor) ){
+            statement = QueryBuilder.select().all().from(MAP_KEYS_TABLE)
+                .where(in)
+                .setFetchSize(limit);
+        }else{
+            statement = QueryBuilder.select().all().from(MAP_KEYS_TABLE)
+                .where(in)
+                .setFetchSize(limit)
+                .setPagingState(PagingState.fromString(cursor));
+        }
+
+
+        ResultSet resultSet = session.execute(statement);
+        PagingState pagingState = resultSet.getExecutionInfo().getPagingState();
+
+        final List<String> keys = new ArrayList<>();
+        Iterator<Row> resultIterator = resultSet.iterator();
+        int size = 0;
+        while( resultIterator.hasNext() && size < limit){
+
+            size++;
+            keys.add((String)DataType.text().deserialize(resultIterator.next().getBytes("column1"), ProtocolVersion.NEWEST_SUPPORTED));
+
+        }
+
+        return new MapKeyResults(pagingState != null ? pagingState.toString() : null, keys);
+
+    }
 
     private ByteBuffer getValueCQL( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
 
@@ -456,10 +502,10 @@ public class MapSerializationImpl implements MapSerialization {
 
     }
 
-    private ByteBuffer getMapKeyPartitionKey(MapScope scope, String key, int bucketNumber){
+    private ByteBuffer getMapKeyPartitionKey(MapScope scope, int bucketNumber){
 
         return serializeKeys(scope.getApplication().getUuid(),
-            scope.getApplication().getType(), scope.getName(), key, bucketNumber);
+            scope.getApplication().getType(), scope.getName(), "", bucketNumber);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe197af2/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index 2a68247..254c915 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -20,10 +20,7 @@
 package org.apache.usergrid.persistence.map;
 
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Before;
@@ -83,6 +80,140 @@ public class MapManagerTest {
     }
 
     @Test
+    public void getAllKeys(){
+
+        MapManager mm = mmf.createMapManager(this.scope);
+
+        final String key1 = "key1";
+        final String key2 = "key2";;
+        final String value = "value";
+
+        mm.putString( key1, value );
+        mm.putString( key2, value );
+
+        MapKeyResults keyResults = mm.getKeys(null, 5);
+
+        assertEquals(2, keyResults.getKeys().size());
+
+    }
+
+    @Test
+    public void getAllKeysWithStart(){
+
+        MapManager mm = mmf.createMapManager(this.scope);
+
+        final String value = "value";
+
+        final String key1 = "key1";
+        final String key2 = "key2";
+        final String key3 = "key3";
+        final String key4 = "key4";
+        final String key5 = "key5";
+        final String key6 = "key6";
+
+        mm.putString( key1, value );
+        mm.putString( key2, value );
+        mm.putString( key3, value );
+        mm.putString( key4, value );
+        mm.putString( key5, value );
+        mm.putString( key6, value );
+
+        MapKeyResults keyResults = mm.getKeys(null, 3);
+
+        assertEquals(3, keyResults.getKeys().size());
+        assertEquals(key1, keyResults.getKeys().get(0));
+
+        assertNotNull(keyResults.getCursor());
+
+        MapKeyResults keyResults2 = mm.getKeys(keyResults.getCursor(), 3);
+
+        assertEquals(3, keyResults2.getKeys().size());
+        assertEquals(key4, keyResults2.getKeys().get(0));
+
+
+    }
+
+    @Test
+    public void getAllKeysAfterDelete(){
+
+        MapManager mm = mmf.createMapManager(this.scope);
+
+        final String value = "value";
+
+        final String key1 = "key1";
+        final String key2 = "key2";
+        final String key3 = "key3";
+        final String key4 = "key4";
+        final String key5 = "key5";
+        final String key6 = "key6";
+
+        mm.putString( key1, value );
+        mm.putString( key2, value );
+        mm.putString( key3, value );
+        mm.putString( key4, value );
+        mm.putString( key5, value );
+        mm.putString( key6, value );
+
+        MapKeyResults keyResults = mm.getKeys(null, 6);
+
+        assertEquals(6, keyResults.getKeys().size());
+        assertEquals(key1, keyResults.getKeys().get(0));
+
+
+        mm.delete(key1);
+        mm.delete(key2);
+        mm.delete(key3);
+
+        MapKeyResults keyResults2 = mm.getKeys(null, 6);
+
+        assertEquals(3, keyResults2.getKeys().size());
+        assertEquals(key4, keyResults2.getKeys().get(0));
+
+
+    }
+
+    @Test
+    public void getAllKeysAfterMassDelete(){
+
+        MapManager mm = mmf.createMapManager(this.scope);
+
+        final String value = "value";
+        final int count = 11000;
+
+        for ( int i=0; i < 11000; i++){
+            mm.putString("key"+i, value);
+        }
+
+        boolean done = false;
+        String cursor = null;
+        int total = 0;
+
+        while(!done && total < count){
+
+            MapKeyResults keyResults = mm.getKeys(cursor, 1000);
+
+            if( keyResults.getCursor() == null){
+                done=true;
+            }
+
+            cursor = keyResults.getCursor();
+            total += keyResults.getKeys().size();
+
+        }
+
+        assertEquals(count, total);
+
+        for ( int i=0; i < count - 500; i++){
+            mm.delete("key"+i);
+        }
+
+        MapKeyResults keyResults2 = mm.getKeys(null, 1000);
+        assertEquals(500, keyResults2.getKeys().size());
+
+
+    }
+
+    @Test
     public void writeReadStringWithLongKey() {
         MapManager mm = mmf.createMapManager( this.scope );