You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/19 17:20:16 UTC
[50/50] incubator-usergrid git commit: Removed Hystrix. Causing
thread OOM issues and not really used in production.
Removed Hystrix. Causing thread OOM issues and not really used in production.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5e6ed4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5e6ed4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5e6ed4a1
Branch: refs/heads/USERGRID-405
Commit: 5e6ed4a12c7aa154f1411d85c6fcce56fbcd4367
Parents: b09dc43
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 18:23:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 18:23:58 2015 -0600
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 12 ++-
.../core/hystrix/HystrixCassandra.java | 94 --------------------
.../persistence/core/task/TaskExecutor.java | 4 +-
.../graph/impl/GraphManagerImpl.java | 30 +++++--
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 9 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 16 +++-
.../impl/stage/NodeDeleteListenerImpl.java | 16 +++-
.../impl/NodeSerializationImpl.java | 36 ++++----
.../shard/count/NodeShardApproximationImpl.java | 4 +-
.../NodeShardCounterSerializationImpl.java | 25 +++---
.../shard/impl/NodeShardAllocationImpl.java | 16 +++-
.../shard/impl/ShardGroupCompactionImpl.java | 64 ++++---------
12 files changed, 128 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index fdc4768..667992c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -33,10 +33,8 @@ import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
-
import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
@@ -184,7 +182,13 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
.withColumnRange( rangeBuilder.build() );
- final Rows<R, C> result = HystrixCassandra.user( query ).getResult();
+ final Rows<R, C> result;
+ try {
+ result = query.execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
//now aggregate them together
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
deleted file mode 100644
index ab71782..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.hystrix;
-
-
-import com.netflix.astyanax.Execution;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixThreadPoolProperties;
-
-
-/**
- * A utility class that creates graph observables wrapped in Hystrix for timeouts and circuit breakers.
- */
-public class HystrixCassandra {
-
-
-
-
- /**
- * Command group used for realtime user commands
- */
- public static final HystrixCommand.Setter
- USER_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
- /**
- * Command group for asynchronous operations
- */
- public static final HystrixCommand.Setter
- ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "async" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
-
- /**
- * Execute an user operation
- */
- public static <R> OperationResult<R> user( final Execution<R> execution) {
- return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
-
- @Override
- protected OperationResult<R> run() {
- try {
- return execution.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }.execute();
- }
-
-
- /**
- * Execute an an async operation
- */
- public static <R> OperationResult<R> async( final Execution<R> execution) {
-
-
- return new HystrixCommand<OperationResult<R>>( ASYNC_GROUP ) {
-
- @Override
- protected OperationResult<R> run() {
- try {
- return execution.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }.execute();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 5e9aa4c..5728d2e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
/**
* An interface for execution of tasks
*/
-public interface TaskExecutor {
+public interface TaskExecutor {
/**
* Submit the task asynchronously
@@ -37,5 +37,5 @@ public interface TaskExecutor {
* Stop the task executor without waiting for scheduled threads to run
*/
public void shutdown();
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 246bbca..26d06ad 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -25,14 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -54,10 +51,12 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Notification;
import rx.Observable;
@@ -202,7 +201,12 @@ public class GraphManagerImpl implements GraphManager {
mutation.mergeShallow( edgeMutation );
- HystrixCassandra.user( mutation );
+ try {
+ mutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
return edge;
}
@@ -241,7 +245,12 @@ public class GraphManagerImpl implements GraphManager {
LOG.debug("Marking edge {} as deleted to commit log", edge);
- HystrixCassandra.user(edgeMutation);
+ try {
+ edgeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
//HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
@@ -285,7 +294,12 @@ public class GraphManagerImpl implements GraphManager {
LOG.debug( "Marking node {} as deleted to node mark", node );
- HystrixCassandra.user( nodeMutation );
+ try {
+ nodeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
//HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 0137ba4..7dca0ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -94,7 +94,12 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
//remove from storage
- HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
+ try {
+ storageSerialization.deleteEdge( scope, edge, timestamp ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 0e1c4e2..ab141f7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -47,6 +46,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -188,7 +188,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
+ "Mutation has {} rows to mutate ",
edgeType, batch.getRowCount() );
- HystrixCassandra.async( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
}
@@ -219,7 +224,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType,
maxTimestamp );
- HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
+ try {
+ serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index f167f0c..e8c224e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -47,6 +46,7 @@ import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action0;
@@ -129,7 +129,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
.doOnCompleted( new Action0() {
@Override
public void call() {
- HystrixCassandra.async(nodeSerialization.delete( scope, node, maxVersion.get() ));
+ try {
+ nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
}
} );
}
@@ -210,7 +215,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
}
- HystrixCassandra.async( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
//now delete meta data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 2cc0391..18062c4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -52,6 +51,7 @@ import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.Row;
@@ -150,22 +150,23 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
+
+ Column<Boolean> result = null;
try {
- Column<Boolean> result = HystrixCassandra
- .user( query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn( COLUMN_NAME ) )
+ result = query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn( COLUMN_NAME ).execute()
.getResult();
-
- return Optional.of( result.getLongValue() );
}
- catch (RuntimeException re ) {
- if(re.getCause().getCause() instanceof NotFoundException) {
- //swallow, there's just no column
- return Optional.absent();
- }
-
- throw re;
+ catch(NotFoundException nfe){
+ //swallow, there's just no column
+ return Optional.absent();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
}
+ return Optional.of( result.getLongValue() );
+
+
}
@@ -193,9 +194,14 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
}
- final Rows<ScopedRowKey<Id>, Boolean> results = HystrixCassandra
- .user( query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ) )
- .getResult();
+ final Rows<ScopedRowKey<Id>, Boolean> results;
+ try {
+ results = query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME )).execute()
+ .getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
for ( Row<ScopedRowKey<Id>, Boolean> row : results ) {
Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 47243e5..a47d528 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import com.netflix.astyanax.MutationBatch;
import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
@@ -229,7 +229,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
/**
* Execute the command in hystrix to avoid slamming cassandra
*/
- new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+ new HystrixCommand( HystrixCommandGroupKey.Factory.asKey("BatchCounterRollup") ) {
@Override
protected Void run() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index 524a0cf..6934275 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -33,9 +33,8 @@ import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -50,6 +49,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
@@ -117,24 +117,19 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
+ OperationResult<Column<Boolean>> column = null;
try {
- OperationResult<Column<Boolean>> column = HystrixCassandra.user(
- keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ) );
-
- return column.getResult().getLongValue();
+ column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
}
//column not found, return 0
- catch ( RuntimeException re ) {
-
- final Throwable cause = re.getCause();
-
- if(cause != null && cause.getCause() instanceof NotFoundException) {
- return 0;
- }
-
- throw re;
+ catch ( NotFoundException nfe ) {
+ return 0;
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to read from cassandra", e );
}
+ return column.getResult().getLongValue();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 0d34b63..ad64338 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -49,6 +48,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.util.TimeUUIDUtils;
@@ -110,7 +110,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
- HystrixCassandra.user( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
@@ -232,7 +237,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
- HystrixCassandra.user( batch );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 9f8efc8..6135121 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -29,13 +29,7 @@ import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -43,7 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -74,6 +67,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/**
@@ -207,8 +201,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
if ( edgeCount % maxWorkSize == 0 ) {
try {
- HystrixCassandra.async( newRowBatch );
- HystrixCassandra.async( deleteRowBatch );
+ newRowBatch.execute();
+ deleteRowBatch.execute();
}
catch ( Throwable t ) {
LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
@@ -219,8 +213,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
try {
- HystrixCassandra.async( newRowBatch );
- HystrixCassandra.async( deleteRowBatch );
+ newRowBatch.execute();
+ deleteRowBatch.execute();
}
catch ( Throwable t ) {
LOG.error( "Unable to move edges to target shard {}", targetShard );
@@ -260,7 +254,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- HystrixCassandra.async( shardRemovalRollup );
+ try {
+ shardRemovalRollup.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
LOG.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", targetShard );
@@ -268,7 +267,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
- HystrixCassandra.async( updateMark );
+ try {
+ updateMark.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
resultBuilder.withCompactedShard( compactedShard );
}
@@ -530,40 +534,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( final int workerSize, final int queueLength ) {
- super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( queueLength ),
- new CompactionThreadFactory(), new RejectionLogger() );
- }
- }
-
-
- private final class CompactionThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- return new Thread( r, "Graph-Shard-Compaction-" + newValue );
- }
- }
-
-
- private final class RejectionLogger implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "Audit queue full, rejecting audit task {}", r );
- }
- }
public static final class CompactionResult {