You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/18 01:24:01 UTC
[1/2] incubator-usergrid git commit: Revert "fix for list types
changing from string to int or whatever"
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-405-merge 762e43e1e -> 5e6ed4a12
Revert "fix for list types changing from string to int or whatever"
This reverts commit e5cab76f8b8fc1e4a60ea23274e7a8a0cc79f759.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b09dc435
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b09dc435
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b09dc435
Branch: refs/heads/USERGRID-405-merge
Commit: b09dc43526cd27dcf75fa09dc5c40a288ca30225
Parents: 762e43e
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 17:30:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 17:30:27 2015 -0600
----------------------------------------------------------------------
.../index/impl/EsEntityIndexBatchImpl.java | 14 ++-
.../persistence/index/impl/IndexingUtils.java | 109 +++++++++----------
.../persistence/index/impl/EntityIndexTest.java | 38 +------
3 files changed, 70 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b09dc435/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 4c32cc1..92312d2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -56,7 +56,15 @@ import com.codahale.metrics.Timer;
import rx.Observable;
import rx.functions.Func1;
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.*;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITYID_ID_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITY_CONTEXT_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.GEO_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -235,7 +243,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
if ( f instanceof ListField ) {
List list = ( List ) field.getValue();
- entityMap.put(LIST_PREFIX + field.getName().toLowerCase(),
+ entityMap.put( field.getName().toLowerCase(),
new ArrayList( processCollectionForMap( list ) ) );
if ( !list.isEmpty() ) {
@@ -247,7 +255,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
}
else if ( f instanceof ArrayField ) {
List list = ( List ) field.getValue();
- entityMap.put(ARRAY_PREFIX + field.getName().toLowerCase(),
+ entityMap.put( field.getName().toLowerCase(),
new ArrayList( processCollectionForMap( list ) ) );
}
else if ( f instanceof SetField ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b09dc435/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 41a4680..ffd98e9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -34,9 +34,6 @@ public class IndexingUtils {
public static final String STRING_PREFIX = "su_";
public static final String ANALYZED_STRING_PREFIX = "sa_";
- public static final String ARRAY_PREFIX = "ar_";
- public static final String LIST_PREFIX = "li_";
-
public static final String GEO_PREFIX = "go_";
public static final String NUMBER_PREFIX = "nu_";
public static final String BOOLEAN_PREFIX = "bu_";
@@ -156,75 +153,75 @@ public class IndexingUtils {
/** add routing "_routing":{ "required":false, "path":"ug_entityId" **/
.startObject("_routing").field("required",true).field("path",ENTITYID_ID_FIELDNAME).endObject()
- .startArray("dynamic_templates")
+ .startArray("dynamic_templates")
// we need most specific mappings first since it's a stop on match algorithm
- .startObject()
+ .startObject()
- .startObject("entity_id_template" )
- .field("match", IndexingUtils.ENTITYID_ID_FIELDNAME )
- .field("match_mapping_type", "string" )
- .startObject( "mapping").field("type", "string" )
- .field("index", "not_analyzed" )
- .endObject()
- .endObject()
- .endObject()
+ .startObject( "entity_id_template" )
+ .field( "match", IndexingUtils.ENTITYID_ID_FIELDNAME )
+ .field( "match_mapping_type", "string" )
+ .startObject( "mapping" ).field( "type", "string" )
+ .field( "index", "not_analyzed" )
+ .endObject()
+ .endObject()
+ .endObject()
- .startObject()
- .startObject("entity_context_template" )
+ .startObject()
+ .startObject( "entity_context_template" )
.field( "match", IndexingUtils.ENTITY_CONTEXT_FIELDNAME )
- .field("match_mapping_type", "string" )
- .startObject( "mapping" )
+ .field( "match_mapping_type", "string" )
+ .startObject( "mapping" )
.field( "type", "string" )
- .field("index", "not_analyzed").endObject()
- .endObject()
- .endObject()
-
- .startObject()
- .startObject("entity_version_template" )
- .field("match", IndexingUtils.ENTITY_VERSION_FIELDNAME )
- .field("match_mapping_type", "string" )
- .startObject( "mapping").field("type", "long" )
- .endObject()
- .endObject()
- .endObject()
+ .field( "index", "not_analyzed" ).endObject()
+ .endObject()
+ .endObject()
+
+ .startObject()
+ .startObject( "entity_version_template" )
+ .field( "match", IndexingUtils.ENTITY_VERSION_FIELDNAME )
+ .field( "match_mapping_type", "string" )
+ .startObject( "mapping" ).field( "type", "long" )
+ .endObject()
+ .endObject()
+ .endObject()
// any string with field name that starts with sa_ gets analyzed
- .startObject()
- .startObject("template_1" )
+ .startObject()
+ .startObject( "template_1" )
.field( "match", ANALYZED_STRING_PREFIX + "*" )
- .field("match_mapping_type", "string" ).startObject( "mapping" )
+ .field( "match_mapping_type", "string" ).startObject( "mapping" )
.field( "type", "string" )
- .field("index", "analyzed" )
- .endObject()
- .endObject()
+ .field( "index", "analyzed" )
+ .endObject()
+ .endObject()
- .endObject()
+ .endObject()
// all other strings are not analyzed
- .startObject()
- .startObject("template_2" )
+ .startObject()
+ .startObject( "template_2" )
//todo, should be string prefix, remove 2 field mapping
- .field("match", "*" )
- .field("match_mapping_type", "string" )
- .startObject( "mapping" )
- .field("type", "string" )
- .field("index", "not_analyzed" )
- .endObject()
- .endObject()
- .endObject()
+ .field( "match", "*" )
+ .field( "match_mapping_type", "string" )
+ .startObject( "mapping" )
+ .field( "type", "string" )
+ .field( "index", "not_analyzed" )
+ .endObject()
+ .endObject()
+ .endObject()
// fields names starting with go_ get geo-indexed
- .startObject()
- .startObject("template_3" )
- .field("match", GEO_PREFIX + "location" )
- .startObject( "mapping" )
- .field("type", "geo_point" )
- .endObject()
- .endObject()
- .endObject()
-
- .endArray()
+ .startObject()
+ .startObject( "template_3" )
+ .field( "match", GEO_PREFIX + "location" )
+ .startObject( "mapping" )
+ .field( "type", "geo_point" )
+ .endObject()
+ .endObject()
+ .endObject()
+
+ .endArray()
.endObject();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b09dc435/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 9cb5297..ca9bf79 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.index.*;
-import org.apache.usergrid.persistence.model.field.*;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.EntityObjectField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
import org.junit.Ignore;
import org.junit.Rule;
@@ -49,6 +51,7 @@ import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -96,7 +99,7 @@ public class EntityIndexTest extends BaseIT {
}
@Test
-// @Ignore("this is a problem i will work on when i can breathe")
+ @Ignore("this is a problem i will work on when i can breathe")
public void testIndexVariations() throws IOException {
Id appId = new SimpleId( "application" );
@@ -116,8 +119,6 @@ public class EntityIndexTest extends BaseIT {
batch.index(indexScope, entity);
batch.execute().get();
- entity = new Entity( entityType );
- entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
List<String> list = new ArrayList<>();
list.add("test");
@@ -125,9 +126,6 @@ public class EntityIndexTest extends BaseIT {
batch.index(indexScope, entity);
batch.execute().get();
-
- entity = new Entity( entityType );
- entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
EntityObject testObj = new EntityObject();
testObj.setField(new StringField("test","testFiedl"));
@@ -135,32 +133,9 @@ public class EntityIndexTest extends BaseIT {
batch.index(indexScope, entity);
batch.execute().get();
- entity = new Entity( entityType );
- entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
- EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
- List<Integer> listint = new ArrayList<>();
- listint.add(0);
- entity.setField(new ArrayField<Integer>("testfield", listint));
- batch.index(indexScope, entity);
- batch.execute().get();
-
- entity = new Entity( entityType );
- entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
- EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
- List<EntityObject> listObj = new ArrayList<>();
- EntityObject listObjField = new EntityObject();
- listObjField.setField(new StringField("testasf","somevalue"));
- listObj.add(listObjField);
- listObjField = new EntityObject();
- listObjField.setField(new IntegerField("testasf",0));
- listObj.add(listObjField);
- entity.setField(new ArrayField<EntityObject>("testfield", listObj));
- batch.index(indexScope, entity);
- batch.execute().get();
-
entityIndex.refresh();
- testQuery(indexScope, searchTypes, entityIndex, "select *", 5);
+ testQueries( indexScope, searchTypes, entityIndex );
}
@Test
@@ -288,7 +263,6 @@ public class EntityIndexTest extends BaseIT {
entityIndex.refresh();
//Hilda Youn
-
testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);
}
[2/2] incubator-usergrid git commit: Removed Hystrix. Causing thread
OOM issues and not really used in production.
Posted by to...@apache.org.
Removed Hystrix. Causing thread OOM issues and not really used in production.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5e6ed4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5e6ed4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5e6ed4a1
Branch: refs/heads/USERGRID-405-merge
Commit: 5e6ed4a12c7aa154f1411d85c6fcce56fbcd4367
Parents: b09dc43
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 18:23:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 18:23:58 2015 -0600
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 12 ++-
.../core/hystrix/HystrixCassandra.java | 94 --------------------
.../persistence/core/task/TaskExecutor.java | 4 +-
.../graph/impl/GraphManagerImpl.java | 30 +++++--
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 9 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 16 +++-
.../impl/stage/NodeDeleteListenerImpl.java | 16 +++-
.../impl/NodeSerializationImpl.java | 36 ++++----
.../shard/count/NodeShardApproximationImpl.java | 4 +-
.../NodeShardCounterSerializationImpl.java | 25 +++---
.../shard/impl/NodeShardAllocationImpl.java | 16 +++-
.../shard/impl/ShardGroupCompactionImpl.java | 64 ++++---------
12 files changed, 128 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index fdc4768..667992c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -33,10 +33,8 @@ import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
-
import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
@@ -184,7 +182,13 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
.withColumnRange( rangeBuilder.build() );
- final Rows<R, C> result = HystrixCassandra.user( query ).getResult();
+ final Rows<R, C> result;
+ try {
+ result = query.execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
//now aggregate them together
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
deleted file mode 100644
index ab71782..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.hystrix;
-
-
-import com.netflix.astyanax.Execution;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixThreadPoolProperties;
-
-
-/**
- * A utility class that creates graph observables wrapped in Hystrix for timeouts and circuit breakers.
- */
-public class HystrixCassandra {
-
-
-
-
- /**
- * Command group used for realtime user commands
- */
- public static final HystrixCommand.Setter
- USER_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
- /**
- * Command group for asynchronous operations
- */
- public static final HystrixCommand.Setter
- ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "async" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
-
- /**
- * Execute an user operation
- */
- public static <R> OperationResult<R> user( final Execution<R> execution) {
- return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
-
- @Override
- protected OperationResult<R> run() {
- try {
- return execution.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }.execute();
- }
-
-
- /**
- * Execute an an async operation
- */
- public static <R> OperationResult<R> async( final Execution<R> execution) {
-
-
- return new HystrixCommand<OperationResult<R>>( ASYNC_GROUP ) {
-
- @Override
- protected OperationResult<R> run() {
- try {
- return execution.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }.execute();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 5e9aa4c..5728d2e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
/**
* An interface for execution of tasks
*/
-public interface TaskExecutor {
+public interface TaskExecutor {
/**
* Submit the task asynchronously
@@ -37,5 +37,5 @@ public interface TaskExecutor {
* Stop the task executor without waiting for scheduled threads to run
*/
public void shutdown();
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 246bbca..26d06ad 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -25,14 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -54,10 +51,12 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Notification;
import rx.Observable;
@@ -202,7 +201,12 @@ public class GraphManagerImpl implements GraphManager {
mutation.mergeShallow( edgeMutation );
- HystrixCassandra.user( mutation );
+ try {
+ mutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
return edge;
}
@@ -241,7 +245,12 @@ public class GraphManagerImpl implements GraphManager {
LOG.debug("Marking edge {} as deleted to commit log", edge);
- HystrixCassandra.user(edgeMutation);
+ try {
+ edgeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
//HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
@@ -285,7 +294,12 @@ public class GraphManagerImpl implements GraphManager {
LOG.debug( "Marking node {} as deleted to node mark", node );
- HystrixCassandra.user( nodeMutation );
+ try {
+ nodeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
//HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 0137ba4..7dca0ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -94,7 +94,12 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
//remove from storage
- HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
+ try {
+ storageSerialization.deleteEdge( scope, edge, timestamp ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 0e1c4e2..ab141f7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -47,6 +46,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -188,7 +188,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
+ "Mutation has {} rows to mutate ",
edgeType, batch.getRowCount() );
- HystrixCassandra.async( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
}
@@ -219,7 +224,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType,
maxTimestamp );
- HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
+ try {
+ serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index f167f0c..e8c224e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -47,6 +46,7 @@ import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action0;
@@ -129,7 +129,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
.doOnCompleted( new Action0() {
@Override
public void call() {
- HystrixCassandra.async(nodeSerialization.delete( scope, node, maxVersion.get() ));
+ try {
+ nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
} );
}
@@ -210,7 +215,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
}
- HystrixCassandra.async( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
//now delete meta data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 2cc0391..18062c4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -52,6 +51,7 @@ import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.Row;
@@ -150,22 +150,23 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
+
+ Column<Boolean> result = null;
try {
- Column<Boolean> result = HystrixCassandra
- .user( query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn( COLUMN_NAME ) )
+ result = query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn( COLUMN_NAME ).execute()
.getResult();
-
- return Optional.of( result.getLongValue() );
}
- catch (RuntimeException re ) {
- if(re.getCause().getCause() instanceof NotFoundException) {
- //swallow, there's just no column
- return Optional.absent();
- }
-
- throw re;
+ catch(NotFoundException nfe){
+ //swallow, there's just no column
+ return Optional.absent();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
}
+ return Optional.of( result.getLongValue() );
+
+
}
@@ -193,9 +194,14 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
}
- final Rows<ScopedRowKey<Id>, Boolean> results = HystrixCassandra
- .user( query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ) )
- .getResult();
+ final Rows<ScopedRowKey<Id>, Boolean> results;
+ try {
+ results = query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME )).execute()
+ .getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
for ( Row<ScopedRowKey<Id>, Boolean> row : results ) {
Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 47243e5..a47d528 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import com.netflix.astyanax.MutationBatch;
import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
@@ -229,7 +229,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
/**
* Execute the command in hystrix to avoid slamming cassandra
*/
- new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+ new HystrixCommand( HystrixCommandGroupKey.Factory.asKey("BatchCounterRollup") ) {
@Override
protected Void run() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index 524a0cf..6934275 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -33,9 +33,8 @@ import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -50,6 +49,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
@@ -117,24 +117,19 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
+ OperationResult<Column<Boolean>> column = null;
try {
- OperationResult<Column<Boolean>> column = HystrixCassandra.user(
- keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ) );
-
- return column.getResult().getLongValue();
+ column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
}
//column not found, return 0
- catch ( RuntimeException re ) {
-
- final Throwable cause = re.getCause();
-
- if(cause != null && cause.getCause() instanceof NotFoundException) {
- return 0;
- }
-
- throw re;
+ catch ( NotFoundException nfe ) {
+ return 0;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to read from cassandra", e );
}
+ return column.getResult().getLongValue();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 0d34b63..ad64338 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -49,6 +48,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.util.TimeUUIDUtils;
@@ -110,7 +110,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
- HystrixCassandra.user( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
@@ -232,7 +237,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
- HystrixCassandra.user( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 9f8efc8..6135121 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -29,13 +29,7 @@ import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -43,7 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -74,6 +67,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/**
@@ -207,8 +201,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
if ( edgeCount % maxWorkSize == 0 ) {
try {
- HystrixCassandra.async( newRowBatch );
- HystrixCassandra.async( deleteRowBatch );
+ newRowBatch.execute();
+ deleteRowBatch.execute();
}
catch ( Throwable t ) {
LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
@@ -219,8 +213,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
try {
- HystrixCassandra.async( newRowBatch );
- HystrixCassandra.async( deleteRowBatch );
+ newRowBatch.execute();
+ deleteRowBatch.execute();
}
catch ( Throwable t ) {
LOG.error( "Unable to move edges to target shard {}", targetShard );
@@ -260,7 +254,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- HystrixCassandra.async( shardRemovalRollup );
+ try {
+ shardRemovalRollup.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
LOG.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", targetShard );
@@ -268,7 +267,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
- HystrixCassandra.async( updateMark );
+ try {
+ updateMark.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
resultBuilder.withCompactedShard( compactedShard );
}
@@ -530,40 +534,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( final int workerSize, final int queueLength ) {
- super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( queueLength ),
- new CompactionThreadFactory(), new RejectionLogger() );
- }
- }
-
-
- private final class CompactionThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- return new Thread( r, "Graph-Shard-Compaction-" + newValue );
- }
- }
-
-
- private final class RejectionLogger implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "Audit queue full, rejecting audit task {}", r );
- }
- }
public static final class CompactionResult {