You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:07 UTC
[08/43] git commit: Removed Hystrix. Revert this commit to re-apply
hystrix on this issue is resolved.
Removed Hystrix. Revert this commit to re-apply hystrix on this issue is resolved.
https://github.com/Netflix/Hystrix/pull/209
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dd37909f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dd37909f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dd37909f
Branch: refs/heads/two-dot-o
Commit: dd37909fd2b04b07cce8bdebf3e9a801ad28da32
Parents: 51a9ffd
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 10 11:25:32 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 10 11:25:32 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 12 +-
.../collection/hystrix/CassandraCommand.java | 74 -------
.../collection/hystrix/CommandUtils.java | 28 ---
.../persistence/collection/rx/ParallelTest.java | 219 -------------------
4 files changed, 6 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 00db576..95aea45 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -201,14 +201,14 @@
<version>${log4j.version}</version>
</dependency>
- <!--Remove custom build once this patch is complete
+ <!-- Re-add once this is done
https://github.com/Netflix/Hystrix/pull/209-->
- <dependency>
- <groupId>com.netflix.hystrix</groupId>
- <artifactId>hystrix-core</artifactId>
- <version>1.3.14-SNAPSHOT</version>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>com.netflix.hystrix</groupId>-->
+ <!--<artifactId>hystrix-core</artifactId>-->
+ <!--<version>1.3.13</version>-->
+ <!--</dependency>-->
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
deleted file mode 100644
index 731933a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.hystrix;
-
-
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Default command that just returns the value handed to it. Useful for creating observables that are subscribed on the
- * correct underlying Hystrix thread pool
- *
- * TODO change this when this PR makes it into head to wrap our observables
- * https://github.com/Netflix/Hystrix/pull/209
- */
-public class CassandraCommand<R> extends HystrixCommand<R> {
-
- public static final String NAME = "CassandraCommand";
-
- public static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( NAME );
-
- public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( NAME );
-
- public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( NAME );
-
-
- private final R value;
-
-
- private CassandraCommand( final R value ) {
- super( GROUP_KEY );
- this.value = value;
- }
-
-
- @Override
- protected R run() throws Exception {
- return value;
- }
-
-
- /**
- * Get the write command
- *
- * @param readValue The value to observe on
- *
- * @return The value wrapped in a Hystrix observable
- */
- private static <R> Observable<R> toObservable( R readValue ) {
- //create a new command and ensure it's observed on the correct thread scheduler
- return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
deleted file mode 100644
index b81f79b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.usergrid.persistence.collection.hystrix;
-
-
-/**
- *
- *
- */
-public class CommandUtils {
-
- /**
- * Get the name of the archiaus property for the core thread pool size
- * @param threadPoolName
- * @return
- */
- public static String getThreadPoolCoreSize(String threadPoolName){
- return "hystrix.threadpool."+ threadPoolName + ".coreSize";
- }
-
- /**
- * Get the name of the archiaus property for the max thread pool size
- * @param threadPoolName
- * @return
- */
- public static String getThreadPoolMaxQueueSize(String threadPoolName){
- return "hystrix.threadpool."+ threadPoolName + ".maxQueueSize";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
deleted file mode 100644
index 434ea26..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.hystrix.CommandUtils;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import com.netflix.config.ConfigurationManager;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Func1;
-import rx.functions.FuncN;
-import rx.schedulers.Schedulers;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Tests that provides examples of how to perform more complex RX operations
- */
-public class ParallelTest {
-
- private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class );
-
-
- private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "TEST_KEY" );
-
-
- public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( GROUP_KEY.name() );
-
- public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( GROUP_KEY.name() );
-
-
- /**
- * An example of how an observable that requires a "fan out" then join should execute.
- */
- @Test(timeout = 5000)
- public void concurrentFunctions() {
- final String input = "input";
-
- final int size = 100;
- //since we start at index 0
- final int expected = size - 1;
-
-
- /**
- * QUESTION Using this thread blocks indefinitely. The execution of the Hystrix command happens on the
- * computation
- * Thread if this is used
- */
- // final Scheduler scheduler = Schedulers.threadPoolForComputation();
-
- //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
- final Scheduler scheduler = Schedulers.io();
-
- //set our size equal
- ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
- // ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, 10 );
-
- //reject requests we have to queue
- ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_QUEUE, -1 );
-
- //latch used to make each thread block to prove correctness
- final CountDownLatch latch = new CountDownLatch( size );
-
-
- final Multiset<String> set = HashMultiset.create();
-
-
- //create our observable and execute it in the I/O pool since we'll be doing I/O operations
-
- /**
- * QUESTION: Should this use the computation scheduler since all operations (except the hystrix command) are
- * non blocking?
- */
-
- final Observable<String> observable = Observable.from( input ).observeOn( scheduler );
-
-
- Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() {
-
- @Override
- public Observable<Integer> call( final String s ) {
- List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
-
- logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName() );
-
- for ( int i = 0; i < size; i++ ) {
-
-
- final int index = i;
-
- //create a new observable and execute the function on it. These should happen in parallel when
- // a subscription occurs
-
- /**
- * QUESTION: Should this again be the process thread, not the I/O
- */
- Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler );
-
- Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
-
- @Override
- public Integer call( final String s ) {
-
- final String threadName = Thread.currentThread().getName();
-
- logger.info( "Invoking parallel task in thread {}", threadName );
-
- /**
- * Simulate a Hystrix command making a call to an external resource. Invokes
- * the Hystrix command immediately as the function is invoked. This is currently
- * how we have to call Cassandra.
- *
- * TODO This needs to be re-written and evaluated once this PR is released https://github.com/Netflix/Hystrix/pull/209
- */
- return new HystrixCommand<Integer>( GROUP_KEY ) {
- @Override
- protected Integer run() throws Exception {
-
- final String threadName = Thread.currentThread().getName();
-
- logger.info( "Invoking hystrix task in thread {}", threadName );
-
-
- set.add( threadName );
-
- latch.countDown();
-
- try {
- latch.await();
- }
- catch ( InterruptedException e ) {
- throw new RuntimeException( "Interrupted", e );
- }
-
- assertTrue( isExecutedInThread() );
-
- return index;
- }
- }.execute();
- }
- } );
-
- functions.add( transformed );
- }
-
- /**
- * Execute the functions above and zip the results together
- */
- Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>() {
-
- @Override
- public Integer call( final Object... args ) {
-
- logger.info( "Invoking zip in thread {}", Thread.currentThread().getName() );
-
- assertEquals( size, args.length );
-
- for ( int i = 0; i < args.length; i++ ) {
- assertEquals( "Indexes are returned in order", i, args[i] );
- }
-
- //just return our string
- return ( Integer ) args[args.length - 1];
- }
- } );
-
- return zipped;
- }
- } );
-
-
- final Integer last = thing.toBlockingObservable().last();
-
-
- assertEquals( expected, last.intValue() );
-
- assertEquals( size, set.size() );
-
- /**
- * Ensure only 1 entry per thread
- */
- for ( String entry : set.elementSet() ) {
- assertEquals( 1, set.count( entry ) );
- }
- }
-}