You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/25 16:21:07 UTC

[08/43] git commit: Removed Hystrix. Revert this commit to re-apply hystrix on this issue is resolved.

Removed Hystrix.  Revert this commit to re-apply hystrix on this issue is resolved.

https://github.com/Netflix/Hystrix/pull/209


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dd37909f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dd37909f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dd37909f

Branch: refs/heads/two-dot-o
Commit: dd37909fd2b04b07cce8bdebf3e9a801ad28da32
Parents: 51a9ffd
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 10 11:25:32 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 10 11:25:32 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |  12 +-
 .../collection/hystrix/CassandraCommand.java    |  74 -------
 .../collection/hystrix/CommandUtils.java        |  28 ---
 .../persistence/collection/rx/ParallelTest.java | 219 -------------------
 4 files changed, 6 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 00db576..95aea45 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -201,14 +201,14 @@
       <version>${log4j.version}</version>
     </dependency>
 
-    <!--Remove custom build once this patch is complete
+    <!-- Re-add once this is done
     https://github.com/Netflix/Hystrix/pull/209-->
 
-    <dependency>
-      <groupId>com.netflix.hystrix</groupId>
-      <artifactId>hystrix-core</artifactId>
-      <version>1.3.14-SNAPSHOT</version>
-    </dependency>
+    <!--<dependency>-->
+        <!--<groupId>com.netflix.hystrix</groupId>-->
+        <!--<artifactId>hystrix-core</artifactId>-->
+        <!--<version>1.3.13</version>-->
+    <!--</dependency>-->
 
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
deleted file mode 100644
index 731933a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.hystrix;
-
-
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Default command that just returns the value handed to it.  Useful for creating observables that are subscribed on the
- * correct underlying Hystrix thread pool
- *
- * TODO change this when this PR makes it into head to wrap our observables
- * https://github.com/Netflix/Hystrix/pull/209
- */
-public class CassandraCommand<R> extends HystrixCommand<R> {
-
-    public static final String NAME = "CassandraCommand";
-
-    public static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( NAME );
-
-    public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( NAME );
-
-    public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( NAME );
-
-
-    private final R value;
-
-
-    private CassandraCommand( final R value ) {
-        super( GROUP_KEY );
-        this.value = value;
-    }
-
-
-    @Override
-    protected R run() throws Exception {
-        return value;
-    }
-
-
-    /**
-     * Get the write command
-     *
-     * @param readValue The value to observe on
-     *
-     * @return The value wrapped in a Hystrix observable
-     */
-    private static <R> Observable<R> toObservable( R readValue ) {
-        //create a new command and ensure it's observed on the correct thread scheduler
-        return new CassandraCommand<R>( readValue ).toObservable( Schedulers.io() );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
deleted file mode 100644
index b81f79b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.usergrid.persistence.collection.hystrix;
-
-
-/**
- *
- *
- */
-public class CommandUtils {
-
-    /**
-     * Get the name of the archiaus property for the core thread pool size
-     * @param threadPoolName
-     * @return
-     */
-    public static String getThreadPoolCoreSize(String threadPoolName){
-        return "hystrix.threadpool."+ threadPoolName + ".coreSize";
-    }
-
-    /**
-       * Get the name of the archiaus property for the max thread pool size
-       * @param threadPoolName
-       * @return
-       */
-      public static String getThreadPoolMaxQueueSize(String threadPoolName){
-          return "hystrix.threadpool."+ threadPoolName + ".maxQueueSize";
-      }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dd37909f/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
deleted file mode 100644
index 434ea26..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.hystrix.CommandUtils;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import com.netflix.config.ConfigurationManager;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Func1;
-import rx.functions.FuncN;
-import rx.schedulers.Schedulers;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Tests that provides examples of how to perform more complex RX operations
- */
-public class ParallelTest {
-
-    private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class );
-
-
-    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "TEST_KEY" );
-
-
-    public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( GROUP_KEY.name() );
-
-    public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize( GROUP_KEY.name() );
-
-
-    /**
-     * An example of how an observable that requires a "fan out" then join should execute.
-     */
-    @Test(timeout = 5000)
-    public void concurrentFunctions() {
-        final String input = "input";
-
-        final int size = 100;
-        //since we start at index 0
-        final int expected = size - 1;
-
-
-        /**
-         * QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix command happens on the
-         * computation
-         * Thread if this is used
-         */
-        //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
-
-        //use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
-        final Scheduler scheduler = Schedulers.io();
-
-        //set our size equal
-        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
-        //        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, 10 );
-
-        //reject requests we have to queue
-        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_QUEUE, -1 );
-
-        //latch used to make each thread block to prove correctness
-        final CountDownLatch latch = new CountDownLatch( size );
-
-
-        final Multiset<String> set = HashMultiset.create();
-
-
-        //create our observable and execute it in the I/O pool since we'll be doing I/O operations
-
-        /**
-         *  QUESTION: Should this use the computation scheduler since all operations (except the hystrix command) are
-         *  non blocking?
-         */
-
-        final Observable<String> observable = Observable.from( input ).observeOn( scheduler );
-
-
-        Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() {
-
-            @Override
-            public Observable<Integer> call( final String s ) {
-                List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
-
-                logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName() );
-
-                for ( int i = 0; i < size; i++ ) {
-
-
-                    final int index = i;
-
-                    //create a new observable and execute the function on it.  These should happen in parallel when
-                    // a subscription occurs
-
-                    /**
-                     * QUESTION: Should this again be the process thread, not the I/O
-                     */
-                    Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler );
-
-                    Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
-
-                        @Override
-                        public Integer call( final String s ) {
-
-                            final String threadName = Thread.currentThread().getName();
-
-                            logger.info( "Invoking parallel task in thread {}", threadName );
-
-                            /**
-                             * Simulate a Hystrix command making a call to an external resource.  Invokes
-                             * the Hystrix command immediately as the function is invoked.  This is currently
-                             * how we have to call Cassandra.
-                             *
-                             * TODO This needs to be re-written and evaluated once this PR is released https://github.com/Netflix/Hystrix/pull/209
-                             */
-                            return new HystrixCommand<Integer>( GROUP_KEY ) {
-                                @Override
-                                protected Integer run() throws Exception {
-
-                                    final String threadName = Thread.currentThread().getName();
-
-                                    logger.info( "Invoking hystrix task in thread {}", threadName );
-
-
-                                    set.add( threadName );
-
-                                    latch.countDown();
-
-                                    try {
-                                        latch.await();
-                                    }
-                                    catch ( InterruptedException e ) {
-                                        throw new RuntimeException( "Interrupted", e );
-                                    }
-
-                                    assertTrue( isExecutedInThread() );
-
-                                    return index;
-                                }
-                            }.execute();
-                        }
-                    } );
-
-                    functions.add( transformed );
-                }
-
-                /**
-                 * Execute the functions above and zip the results together
-                 */
-                Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>() {
-
-                    @Override
-                    public Integer call( final Object... args ) {
-
-                        logger.info( "Invoking zip in thread {}", Thread.currentThread().getName() );
-
-                        assertEquals( size, args.length );
-
-                        for ( int i = 0; i < args.length; i++ ) {
-                            assertEquals( "Indexes are returned in order", i, args[i] );
-                        }
-
-                        //just return our string
-                        return ( Integer ) args[args.length - 1];
-                    }
-                } );
-
-                return zipped;
-            }
-        } );
-
-
-        final Integer last = thing.toBlockingObservable().last();
-
-
-        assertEquals( expected, last.intValue() );
-
-        assertEquals( size, set.size() );
-
-        /**
-         * Ensure only 1 entry per thread
-         */
-        for ( String entry : set.elementSet() ) {
-            assertEquals( 1, set.count( entry ) );
-        }
-    }
-}