You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/21 03:26:22 UTC

[3/4] git commit: Merge branch 'master' into netty-default

Merge branch 'master' into netty-default

Conflicts:
	bin/install_zmq.sh
	storm-core/src/clj/backtype/storm/messaging/zmq.clj
	storm-core/src/clj/zilch/mq.clj
	storm-core/test/clj/zilch/test/mq.clj
	storm-netty/project.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/962d5207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/962d5207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/962d5207

Branch: refs/heads/master
Commit: 962d520777368709cf30d6214719bfa1b81bbd4c
Parents: b63ed13 7e40f9f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Dec 16 13:21:19 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Dec 16 13:21:19 2013 -0500

----------------------------------------------------------------------
 LICENSE                                         | 285 +++++++++++++++++++
 LICENSE.html                                    | 261 -----------------
 NOTICE                                          |  12 +-
 README.markdown                                 |  23 +-
 VERSION                                         |   2 +-
 bin/build_modules.sh                            |  16 ++
 bin/build_release.sh                            |  17 ++
 bin/javadoc.sh                                  |  16 ++
 bin/storm                                       |  16 ++
 bin/to_maven.sh                                 |  17 ++
 conf/defaults.yaml                              |  18 ++
 conf/jaas_digest.conf                           |  18 ++
 conf/logback.xml                                |  17 +-
 conf/storm.yaml.example                         |  16 ++
 logback/cluster.xml                             |  18 +-
 project.clj                                     |  15 +
 storm-console-logging/project.clj               |  15 +
 storm-core/project.clj                          |  21 +-
 .../src/clj/backtype/storm/LocalCluster.clj     |  15 +
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |  15 +
 storm-core/src/clj/backtype/storm/bootstrap.clj |  15 +
 storm-core/src/clj/backtype/storm/clojure.clj   |  15 +
 storm-core/src/clj/backtype/storm/cluster.clj   |  15 +
 .../src/clj/backtype/storm/command/activate.clj |  15 +
 .../clj/backtype/storm/command/config_value.clj |  15 +
 .../clj/backtype/storm/command/deactivate.clj   |  15 +
 .../backtype/storm/command/dev_zookeeper.clj    |  15 +
 .../backtype/storm/command/kill_topology.clj    |  15 +
 .../src/clj/backtype/storm/command/list.clj     |  15 +
 .../clj/backtype/storm/command/rebalance.clj    |  15 +
 .../backtype/storm/command/shell_submission.clj |  15 +
 storm-core/src/clj/backtype/storm/config.clj    |  15 +
 .../src/clj/backtype/storm/daemon/acker.clj     |  15 +
 .../backtype/storm/daemon/builtin_metrics.clj   |  15 +
 .../src/clj/backtype/storm/daemon/common.clj    |  15 +
 .../src/clj/backtype/storm/daemon/drpc.clj      |  19 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |  15 +
 .../src/clj/backtype/storm/daemon/logviewer.clj |  15 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  15 +
 .../clj/backtype/storm/daemon/supervisor.clj    |  15 +
 .../src/clj/backtype/storm/daemon/task.clj      |  15 +
 .../src/clj/backtype/storm/daemon/worker.clj    |  15 +
 storm-core/src/clj/backtype/storm/disruptor.clj |  15 +
 storm-core/src/clj/backtype/storm/event.clj     |  15 +
 storm-core/src/clj/backtype/storm/log.clj       |  15 +
 .../src/clj/backtype/storm/messaging/loader.clj |  15 +
 .../src/clj/backtype/storm/messaging/local.clj  |  15 +
 .../src/clj/backtype/storm/metric/testing.clj   |  15 +
 .../clj/backtype/storm/process_simulator.clj    |  15 +
 .../storm/scheduler/DefaultScheduler.clj        |  15 +
 .../backtype/storm/scheduler/EvenScheduler.clj  |  15 +
 .../storm/scheduler/IsolationScheduler.clj      |  15 +
 storm-core/src/clj/backtype/storm/stats.clj     |  15 +
 storm-core/src/clj/backtype/storm/testing.clj   |  15 +
 storm-core/src/clj/backtype/storm/testing4j.clj |  15 +
 storm-core/src/clj/backtype/storm/thrift.clj    |  15 +
 storm-core/src/clj/backtype/storm/timer.clj     |  15 +
 storm-core/src/clj/backtype/storm/tuple.clj     |  15 +
 storm-core/src/clj/backtype/storm/ui/core.clj   |  15 +
 .../src/clj/backtype/storm/ui/helpers.clj       |  15 +
 storm-core/src/clj/backtype/storm/util.clj      |  15 +
 storm-core/src/clj/backtype/storm/zookeeper.clj |  15 +
 storm-core/src/clj/storm/trident/testing.clj    |  15 +
 storm-core/src/dev/resources/tester_bolt.py     |  18 ++
 storm-core/src/dev/resources/tester_bolt.rb     |  18 ++
 storm-core/src/dev/resources/tester_spout.py    |  18 ++
 storm-core/src/dev/resources/tester_spout.rb    |  17 ++
 storm-core/src/genthrift.sh                     |  16 ++
 storm-core/src/jvm/backtype/storm/Config.java   |  17 ++
 .../jvm/backtype/storm/ConfigValidation.java    |  17 ++
 .../src/jvm/backtype/storm/Constants.java       |  17 ++
 .../src/jvm/backtype/storm/ILocalCluster.java   |  17 ++
 .../src/jvm/backtype/storm/ILocalDRPC.java      |  17 ++
 .../src/jvm/backtype/storm/StormSubmitter.java  |  17 ++
 .../jvm/backtype/storm/clojure/ClojureBolt.java |  17 ++
 .../backtype/storm/clojure/ClojureSpout.java    |  17 ++
 .../backtype/storm/clojure/RichShellBolt.java   |  17 ++
 .../backtype/storm/clojure/RichShellSpout.java  |  17 ++
 .../storm/coordination/BatchBoltExecutor.java   |  17 ++
 .../coordination/BatchOutputCollector.java      |  17 ++
 .../coordination/BatchOutputCollectorImpl.java  |  17 ++
 .../coordination/BatchSubtopologyBuilder.java   |  17 ++
 .../storm/coordination/CoordinatedBolt.java     |  17 ++
 .../backtype/storm/coordination/IBatchBolt.java |  17 ++
 .../jvm/backtype/storm/daemon/Shutdownable.java |  17 ++
 .../storm/drpc/DRPCInvocationsClient.java       |  17 ++
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |  17 ++
 .../src/jvm/backtype/storm/drpc/JoinResult.java |  17 ++
 .../jvm/backtype/storm/drpc/KeyedFairBolt.java  |  17 ++
 .../storm/drpc/LinearDRPCInputDeclarer.java     |  17 ++
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |  17 ++
 .../jvm/backtype/storm/drpc/PrepareRequest.java |  17 ++
 .../jvm/backtype/storm/drpc/ReturnResults.java  |  17 ++
 .../storm/generated/AlreadyAliveException.java  |  17 ++
 .../src/jvm/backtype/storm/generated/Bolt.java  |  17 ++
 .../jvm/backtype/storm/generated/BoltStats.java |  17 ++
 .../storm/generated/ClusterSummary.java         |  17 ++
 .../storm/generated/ComponentCommon.java        |  17 ++
 .../storm/generated/ComponentObject.java        |  17 ++
 .../storm/generated/DRPCExecutionException.java |  17 ++
 .../backtype/storm/generated/DRPCRequest.java   |  17 ++
 .../storm/generated/DistributedRPC.java         |  17 ++
 .../generated/DistributedRPCInvocations.java    |  17 ++
 .../jvm/backtype/storm/generated/ErrorInfo.java |  17 ++
 .../backtype/storm/generated/ExecutorInfo.java  |  17 ++
 .../storm/generated/ExecutorSpecificStats.java  |  17 ++
 .../backtype/storm/generated/ExecutorStats.java |  17 ++
 .../storm/generated/ExecutorSummary.java        |  17 ++
 .../storm/generated/GlobalStreamId.java         |  17 ++
 .../jvm/backtype/storm/generated/Grouping.java  |  17 ++
 .../generated/InvalidTopologyException.java     |  17 ++
 .../backtype/storm/generated/JavaObject.java    |  17 ++
 .../backtype/storm/generated/JavaObjectArg.java |  17 ++
 .../backtype/storm/generated/KillOptions.java   |  17 ++
 .../jvm/backtype/storm/generated/Nimbus.java    |  17 ++
 .../storm/generated/NotAliveException.java      |  17 ++
 .../backtype/storm/generated/NullStruct.java    |  17 ++
 .../storm/generated/RebalanceOptions.java       |  17 ++
 .../storm/generated/ShellComponent.java         |  17 ++
 .../jvm/backtype/storm/generated/SpoutSpec.java |  17 ++
 .../backtype/storm/generated/SpoutStats.java    |  17 ++
 .../storm/generated/StateSpoutSpec.java         |  17 ++
 .../backtype/storm/generated/StormTopology.java |  17 ++
 .../backtype/storm/generated/StreamInfo.java    |  17 ++
 .../backtype/storm/generated/SubmitOptions.java |  17 ++
 .../storm/generated/SupervisorSummary.java      |  17 ++
 .../backtype/storm/generated/TopologyInfo.java  |  17 ++
 .../storm/generated/TopologyInitialStatus.java  |  17 ++
 .../storm/generated/TopologySummary.java        |  17 ++
 .../storm/grouping/CustomStreamGrouping.java    |  17 ++
 .../jvm/backtype/storm/hooks/BaseTaskHook.java  |  17 ++
 .../src/jvm/backtype/storm/hooks/ITaskHook.java |  17 ++
 .../backtype/storm/hooks/info/BoltAckInfo.java  |  17 ++
 .../storm/hooks/info/BoltExecuteInfo.java       |  17 ++
 .../backtype/storm/hooks/info/BoltFailInfo.java |  17 ++
 .../jvm/backtype/storm/hooks/info/EmitInfo.java |  17 ++
 .../backtype/storm/hooks/info/SpoutAckInfo.java |  17 ++
 .../storm/hooks/info/SpoutFailInfo.java         |  17 ++
 .../backtype/storm/messaging/IConnection.java   |  17 ++
 .../jvm/backtype/storm/messaging/IContext.java  |  17 ++
 .../backtype/storm/messaging/TaskMessage.java   |  17 ++
 .../storm/messaging/TransportFactory.java       |  17 ++
 .../backtype/storm/messaging/netty/Client.java  |  17 ++
 .../backtype/storm/messaging/netty/Context.java |  17 ++
 .../storm/messaging/netty/ControlMessage.java   |  17 ++
 .../storm/messaging/netty/MessageBatch.java     |  17 ++
 .../storm/messaging/netty/MessageDecoder.java   |  17 ++
 .../storm/messaging/netty/MessageEncoder.java   |  17 ++
 .../backtype/storm/messaging/netty/Server.java  |  17 ++
 .../messaging/netty/StormClientHandler.java     |  17 ++
 .../netty/StormClientPipelineFactory.java       |  17 ++
 .../messaging/netty/StormServerHandler.java     |  17 ++
 .../netty/StormServerPipelineFactory.java       |  17 ++
 .../storm/metric/LoggingMetricsConsumer.java    |  17 ++
 .../storm/metric/MetricsConsumerBolt.java       |  17 ++
 .../jvm/backtype/storm/metric/SystemBolt.java   |  17 ++
 .../storm/metric/api/AssignableMetric.java      |  17 ++
 .../storm/metric/api/CombinedMetric.java        |  17 ++
 .../backtype/storm/metric/api/CountMetric.java  |  17 ++
 .../backtype/storm/metric/api/ICombiner.java    |  17 ++
 .../jvm/backtype/storm/metric/api/IMetric.java  |  17 ++
 .../storm/metric/api/IMetricsConsumer.java      |  17 ++
 .../jvm/backtype/storm/metric/api/IReducer.java |  17 ++
 .../storm/metric/api/IStatefulObject.java       |  17 ++
 .../backtype/storm/metric/api/MeanReducer.java  |  17 ++
 .../storm/metric/api/MultiCountMetric.java      |  17 ++
 .../storm/metric/api/MultiReducedMetric.java    |  17 ++
 .../storm/metric/api/ReducedMetric.java         |  17 ++
 .../backtype/storm/metric/api/StateMetric.java  |  17 ++
 .../storm/nimbus/DefaultTopologyValidator.java  |  17 ++
 .../storm/nimbus/ITopologyValidator.java        |  17 ++
 .../backtype/storm/planner/CompoundSpout.java   |  17 ++
 .../backtype/storm/planner/CompoundTask.java    |  17 ++
 .../jvm/backtype/storm/planner/TaskBundle.java  |  17 ++
 .../jvm/backtype/storm/scheduler/Cluster.java   |  17 ++
 .../storm/scheduler/ExecutorDetails.java        |  91 +++---
 .../jvm/backtype/storm/scheduler/INimbus.java   |  17 ++
 .../backtype/storm/scheduler/IScheduler.java    |  17 ++
 .../backtype/storm/scheduler/ISupervisor.java   |  17 ++
 .../storm/scheduler/SchedulerAssignment.java    |  97 ++++---
 .../scheduler/SchedulerAssignmentImpl.java      | 201 +++++++------
 .../storm/scheduler/SupervisorDetails.java      |  17 ++
 .../backtype/storm/scheduler/Topologies.java    |  97 ++++---
 .../storm/scheduler/TopologyDetails.java        |  17 ++
 .../backtype/storm/scheduler/WorkerSlot.java    |  17 ++
 .../backtype/storm/security/auth/AuthUtils.java |  17 ++
 .../storm/security/auth/IAuthorizer.java        |  17 ++
 .../storm/security/auth/ITransportPlugin.java   |  17 ++
 .../storm/security/auth/ReqContext.java         |  17 ++
 .../security/auth/SaslTransportPlugin.java      |  17 ++
 .../security/auth/SimpleTransportPlugin.java    |  17 ++
 .../storm/security/auth/ThriftClient.java       |  17 ++
 .../storm/security/auth/ThriftServer.java       |  17 ++
 .../auth/authorizer/DenyAuthorizer.java         |  17 ++
 .../auth/authorizer/NoopAuthorizer.java         |  17 ++
 .../auth/digest/ClientCallbackHandler.java      |  17 ++
 .../auth/digest/DigestSaslTransportPlugin.java  |  17 ++
 .../auth/digest/ServerCallbackHandler.java      |  17 ++
 .../serialization/BlowfishTupleSerializer.java  |  17 ++
 .../storm/serialization/DefaultKryoFactory.java |  17 ++
 .../storm/serialization/IKryoDecorator.java     |  17 ++
 .../storm/serialization/IKryoFactory.java       |  17 ++
 .../storm/serialization/ITupleDeserializer.java |  17 ++
 .../storm/serialization/ITupleSerializer.java   |  17 ++
 .../serialization/KryoTupleDeserializer.java    |  17 ++
 .../serialization/KryoTupleSerializer.java      |  17 ++
 .../serialization/KryoValuesDeserializer.java   |  17 ++
 .../serialization/KryoValuesSerializer.java     |  17 ++
 .../serialization/SerializableSerializer.java   |  17 ++
 .../serialization/SerializationFactory.java     |  17 ++
 .../types/ArrayListSerializer.java              |  17 ++
 .../serialization/types/HashMapSerializer.java  |  17 ++
 .../serialization/types/HashSetSerializer.java  |  17 ++
 .../types/ListDelegateSerializer.java           |  17 ++
 .../storm/spout/IMultiSchemableSpout.java       |  17 ++
 .../backtype/storm/spout/ISchemableSpout.java   |  17 ++
 .../src/jvm/backtype/storm/spout/ISpout.java    |  17 ++
 .../storm/spout/ISpoutOutputCollector.java      |  17 ++
 .../storm/spout/ISpoutWaitStrategy.java         |  17 ++
 .../jvm/backtype/storm/spout/MultiScheme.java   |  17 ++
 .../storm/spout/NothingEmptyEmitStrategy.java   |  17 ++
 .../backtype/storm/spout/RawMultiScheme.java    |  17 ++
 .../src/jvm/backtype/storm/spout/RawScheme.java |  17 ++
 .../src/jvm/backtype/storm/spout/Scheme.java    |  17 ++
 .../storm/spout/SchemeAsMultiScheme.java        |  17 ++
 .../jvm/backtype/storm/spout/ShellSpout.java    |  17 ++
 .../storm/spout/SleepSpoutWaitStrategy.java     |  17 ++
 .../storm/spout/SpoutOutputCollector.java       |  17 ++
 .../jvm/backtype/storm/state/IStateSpout.java   |  17 ++
 .../storm/state/IStateSpoutOutputCollector.java |  17 ++
 .../backtype/storm/state/ISubscribedState.java  |  17 ++
 .../state/ISynchronizeOutputCollector.java      |  17 ++
 .../storm/state/StateSpoutOutputCollector.java  |  17 ++
 .../storm/state/SynchronizeOutputCollector.java |  17 ++
 .../storm/task/GeneralTopologyContext.java      |  17 ++
 .../src/jvm/backtype/storm/task/IBolt.java      |  17 ++
 .../jvm/backtype/storm/task/IErrorReporter.java |  17 ++
 .../backtype/storm/task/IMetricsContext.java    |  17 ++
 .../backtype/storm/task/IOutputCollector.java   |  17 ++
 .../backtype/storm/task/OutputCollector.java    |  17 ++
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  17 ++
 .../backtype/storm/task/TopologyContext.java    |  17 ++
 .../storm/task/WorkerTopologyContext.java       |  17 ++
 .../backtype/storm/testing/AckFailDelegate.java |  17 ++
 .../storm/testing/AckFailMapTracker.java        |  17 ++
 .../jvm/backtype/storm/testing/AckTracker.java  |  17 ++
 .../backtype/storm/testing/BatchNumberList.java |  17 ++
 .../storm/testing/BatchProcessWord.java         |  17 ++
 .../backtype/storm/testing/BatchRepeatA.java    |  17 ++
 .../jvm/backtype/storm/testing/BoltTracker.java |  17 ++
 .../storm/testing/CompleteTopologyParam.java    | 117 ++++----
 .../storm/testing/CountingBatchBolt.java        |  17 ++
 .../storm/testing/CountingCommitBolt.java       |  17 ++
 .../jvm/backtype/storm/testing/FeederSpout.java |  17 ++
 .../jvm/backtype/storm/testing/FixedTuple.java  |  17 ++
 .../backtype/storm/testing/FixedTupleSpout.java |  17 ++
 .../backtype/storm/testing/IdentityBolt.java    |  17 ++
 .../storm/testing/KeyedCountingBatchBolt.java   |  17 ++
 .../testing/KeyedCountingCommitterBolt.java     |  17 ++
 .../storm/testing/KeyedSummingBatchBolt.java    |  17 ++
 .../storm/testing/MemoryTransactionalSpout.java |  17 ++
 .../testing/MemoryTransactionalSpoutMeta.java   |  17 ++
 .../backtype/storm/testing/MkClusterParam.java  |  97 ++++---
 .../backtype/storm/testing/MkTupleParam.java    |  85 +++---
 .../backtype/storm/testing/MockedSources.java   | 103 ++++---
 .../jvm/backtype/storm/testing/NGrouping.java   |  17 ++
 .../storm/testing/NonRichBoltTracker.java       |  17 ++
 .../testing/OpaqueMemoryTransactionalSpout.java |  17 ++
 .../storm/testing/PrepareBatchBolt.java         |  17 ++
 .../backtype/storm/testing/SpoutTracker.java    |  17 ++
 .../storm/testing/TestAggregatesCounter.java    |  17 ++
 .../backtype/storm/testing/TestConfBolt.java    |  17 ++
 .../backtype/storm/testing/TestGlobalCount.java |  17 ++
 .../src/jvm/backtype/storm/testing/TestJob.java |  65 +++--
 .../storm/testing/TestKryoDecorator.java        |  17 ++
 .../backtype/storm/testing/TestPlannerBolt.java |  17 ++
 .../storm/testing/TestPlannerSpout.java         |  17 ++
 .../backtype/storm/testing/TestSerObject.java   |  17 ++
 .../backtype/storm/testing/TestWordCounter.java |  17 ++
 .../backtype/storm/testing/TestWordSpout.java   |  17 ++
 .../backtype/storm/testing/TrackedTopology.java |  51 ++--
 .../storm/testing/TupleCaptureBolt.java         |  17 ++
 .../topology/BaseConfigurationDeclarer.java     |  17 ++
 .../storm/topology/BasicBoltExecutor.java       |  17 ++
 .../storm/topology/BasicOutputCollector.java    |  17 ++
 .../backtype/storm/topology/BoltDeclarer.java   |  17 ++
 .../ComponentConfigurationDeclarer.java         |  17 ++
 .../storm/topology/FailedException.java         |  17 ++
 .../jvm/backtype/storm/topology/IBasicBolt.java |  17 ++
 .../storm/topology/IBasicOutputCollector.java   |  17 ++
 .../jvm/backtype/storm/topology/IComponent.java |  17 ++
 .../jvm/backtype/storm/topology/IRichBolt.java  |  17 ++
 .../jvm/backtype/storm/topology/IRichSpout.java |  17 ++
 .../storm/topology/IRichStateSpout.java         |  17 ++
 .../backtype/storm/topology/InputDeclarer.java  |  17 ++
 .../storm/topology/OutputFieldsDeclarer.java    |  17 ++
 .../storm/topology/OutputFieldsGetter.java      |  17 ++
 .../storm/topology/ReportedFailedException.java |  17 ++
 .../backtype/storm/topology/SpoutDeclarer.java  |  17 ++
 .../storm/topology/TopologyBuilder.java         |  17 ++
 .../storm/topology/base/BaseBasicBolt.java      |  17 ++
 .../storm/topology/base/BaseBatchBolt.java      |  17 ++
 .../storm/topology/base/BaseComponent.java      |  17 ++
 ...BaseOpaquePartitionedTransactionalSpout.java |  17 ++
 .../base/BasePartitionedTransactionalSpout.java |  17 ++
 .../storm/topology/base/BaseRichBolt.java       |  17 ++
 .../storm/topology/base/BaseRichSpout.java      |  17 ++
 .../topology/base/BaseTransactionalBolt.java    |  17 ++
 .../topology/base/BaseTransactionalSpout.java   |  17 ++
 .../storm/transactional/ICommitter.java         |  17 ++
 .../ICommitterTransactionalSpout.java           |  17 ++
 .../transactional/ITransactionalSpout.java      |  17 ++
 .../storm/transactional/TransactionAttempt.java |  17 ++
 .../TransactionalSpoutBatchExecutor.java        |  17 ++
 .../TransactionalSpoutCoordinator.java          |  17 ++
 .../TransactionalTopologyBuilder.java           |  17 ++
 .../IOpaquePartitionedTransactionalSpout.java   |  17 ++
 .../IPartitionedTransactionalSpout.java         |  17 ++
 ...uePartitionedTransactionalSpoutExecutor.java |  17 ++
 .../PartitionedTransactionalSpoutExecutor.java  |  17 ++
 .../state/RotatingTransactionalState.java       |  17 ++
 .../transactional/state/TransactionalState.java |  17 ++
 .../src/jvm/backtype/storm/tuple/Fields.java    |  17 ++
 .../src/jvm/backtype/storm/tuple/MessageId.java |  17 ++
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  17 ++
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  17 ++
 .../src/jvm/backtype/storm/tuple/Values.java    |  17 ++
 .../storm/utils/BufferFileInputStream.java      |  17 ++
 .../backtype/storm/utils/CRC32OutputStream.java |  17 ++
 .../backtype/storm/utils/ClojureTimerTask.java  |  17 ++
 .../src/jvm/backtype/storm/utils/Container.java |  17 ++
 .../jvm/backtype/storm/utils/DRPCClient.java    |  17 ++
 .../backtype/storm/utils/DisruptorQueue.java    |  17 ++
 .../storm/utils/IndifferentAccessMap.java       |  17 ++
 .../backtype/storm/utils/InprocMessaging.java   |  17 ++
 .../storm/utils/KeyedRoundRobinQueue.java       |  17 ++
 .../jvm/backtype/storm/utils/ListDelegate.java  |  17 ++
 .../jvm/backtype/storm/utils/LocalState.java    |  17 ++
 .../jvm/backtype/storm/utils/MutableInt.java    |  17 ++
 .../jvm/backtype/storm/utils/MutableLong.java   |  17 ++
 .../jvm/backtype/storm/utils/MutableObject.java |  17 ++
 .../jvm/backtype/storm/utils/NimbusClient.java  |  17 ++
 .../storm/utils/RegisteredGlobalState.java      |  17 ++
 .../jvm/backtype/storm/utils/RotatingMap.java   |  17 ++
 .../backtype/storm/utils/ServiceRegistry.java   |  17 ++
 .../jvm/backtype/storm/utils/ShellProcess.java  |  17 ++
 .../storm/utils/ThriftTopologyUtils.java        |  17 ++
 .../src/jvm/backtype/storm/utils/Time.java      |  17 ++
 .../jvm/backtype/storm/utils/TimeCacheMap.java  |  17 ++
 .../src/jvm/backtype/storm/utils/Utils.java     |  17 ++
 .../backtype/storm/utils/VersionedStore.java    |  17 ++
 .../storm/utils/WindowedTimeThrottler.java      |  17 ++
 .../jvm/backtype/storm/utils/WritableUtils.java |  17 ++
 .../backtype/storm/utils/ZookeeperAuthInfo.java |  17 ++
 storm-core/src/jvm/storm/trident/JoinType.java  |  17 ++
 storm-core/src/jvm/storm/trident/Stream.java    |  17 ++
 .../src/jvm/storm/trident/TridentState.java     |  17 ++
 .../src/jvm/storm/trident/TridentTopology.java  |  17 ++
 .../trident/drpc/ReturnResultsReducer.java      |  17 ++
 .../fluent/ChainedAggregatorDeclarer.java       |  17 ++
 .../fluent/ChainedFullAggregatorDeclarer.java   |  17 ++
 .../ChainedPartitionAggregatorDeclarer.java     |  17 ++
 .../trident/fluent/GlobalAggregationScheme.java |  17 ++
 .../jvm/storm/trident/fluent/GroupedStream.java |  17 ++
 .../trident/fluent/IAggregatableStream.java     |  17 ++
 .../fluent/IChainedAggregatorDeclarer.java      |  17 ++
 .../jvm/storm/trident/fluent/UniqueIdGen.java   |  17 ++
 .../jvm/storm/trident/graph/GraphGrouper.java   |  17 ++
 .../src/jvm/storm/trident/graph/Group.java      |  17 ++
 .../jvm/storm/trident/operation/Aggregator.java |  17 ++
 .../jvm/storm/trident/operation/Assembly.java   |  17 ++
 .../storm/trident/operation/BaseAggregator.java |  17 ++
 .../jvm/storm/trident/operation/BaseFilter.java |  17 ++
 .../storm/trident/operation/BaseFunction.java   |  17 ++
 .../trident/operation/BaseMultiReducer.java     |  17 ++
 .../storm/trident/operation/BaseOperation.java  |  17 ++
 .../trident/operation/CombinerAggregator.java   |  17 ++
 .../storm/trident/operation/EachOperation.java  |  17 ++
 .../src/jvm/storm/trident/operation/Filter.java |  17 ++
 .../jvm/storm/trident/operation/Function.java   |  17 ++
 .../trident/operation/GroupedMultiReducer.java  |  17 ++
 .../storm/trident/operation/MultiReducer.java   |  17 ++
 .../jvm/storm/trident/operation/Operation.java  |  17 ++
 .../trident/operation/ReducerAggregator.java    |  17 ++
 .../trident/operation/TridentCollector.java     |  17 ++
 .../operation/TridentMultiReducerContext.java   |  17 ++
 .../operation/TridentOperationContext.java      |  17 ++
 .../storm/trident/operation/builtin/Count.java  |  17 ++
 .../storm/trident/operation/builtin/Debug.java  |  17 ++
 .../storm/trident/operation/builtin/Equals.java |  17 ++
 .../trident/operation/builtin/FilterNull.java   |  17 ++
 .../storm/trident/operation/builtin/FirstN.java |  17 ++
 .../storm/trident/operation/builtin/MapGet.java |  17 ++
 .../storm/trident/operation/builtin/Negate.java |  17 ++
 .../trident/operation/builtin/SnapshotGet.java  |  17 ++
 .../storm/trident/operation/builtin/Sum.java    |  17 ++
 .../operation/builtin/TupleCollectionGet.java   |  17 ++
 .../operation/impl/CaptureCollector.java        |  17 ++
 .../operation/impl/ChainedAggregatorImpl.java   |  17 ++
 .../trident/operation/impl/ChainedResult.java   |  17 ++
 .../operation/impl/CombinerAggStateUpdater.java |  17 ++
 .../impl/CombinerAggregatorCombineImpl.java     |  17 ++
 .../impl/CombinerAggregatorInitImpl.java        |  17 ++
 .../trident/operation/impl/FilterExecutor.java  |  17 ++
 .../operation/impl/GlobalBatchToPartition.java  |  17 ++
 .../trident/operation/impl/GroupCollector.java  |  17 ++
 .../operation/impl/GroupedAggregator.java       |  17 ++
 .../impl/GroupedMultiReducerExecutor.java       |  17 ++
 .../operation/impl/IdentityMultiReducer.java    |  17 ++
 .../impl/IndexHashBatchToPartition.java         |  17 ++
 .../operation/impl/JoinerMultiReducer.java      |  17 ++
 .../operation/impl/ReducerAggStateUpdater.java  |  17 ++
 .../operation/impl/ReducerAggregatorImpl.java   |  17 ++
 .../storm/trident/operation/impl/Result.java    |  17 ++
 .../operation/impl/SingleEmitAggregator.java    |  17 ++
 .../trident/operation/impl/TrueFilter.java      |  17 ++
 .../storm/trident/partition/GlobalGrouping.java |  17 ++
 .../trident/partition/IdentityGrouping.java     |  17 ++
 .../trident/partition/IndexHashGrouping.java    |  17 ++
 .../storm/trident/planner/BridgeReceiver.java   |  17 ++
 .../src/jvm/storm/trident/planner/Node.java     |  17 ++
 .../storm/trident/planner/NodeStateInfo.java    |  17 ++
 .../storm/trident/planner/PartitionNode.java    |  17 ++
 .../storm/trident/planner/ProcessorContext.java |  17 ++
 .../storm/trident/planner/ProcessorNode.java    |  17 ++
 .../jvm/storm/trident/planner/SpoutNode.java    |  17 ++
 .../storm/trident/planner/SubtopologyBolt.java  |  17 ++
 .../storm/trident/planner/TridentProcessor.java |  17 ++
 .../storm/trident/planner/TupleReceiver.java    |  17 ++
 .../planner/processor/AggregateProcessor.java   |  17 ++
 .../planner/processor/AppendCollector.java      |  17 ++
 .../planner/processor/EachProcessor.java        |  17 ++
 .../planner/processor/FreshCollector.java       |  17 ++
 .../processor/MultiReducerProcessor.java        |  17 ++
 .../processor/PartitionPersistProcessor.java    |  17 ++
 .../planner/processor/ProjectedProcessor.java   |  17 ++
 .../planner/processor/StateQueryProcessor.java  |  17 ++
 .../planner/processor/TridentContext.java       |  17 ++
 .../storm/trident/spout/BatchSpoutExecutor.java |  17 ++
 .../src/jvm/storm/trident/spout/IBatchID.java   |  17 ++
 .../jvm/storm/trident/spout/IBatchSpout.java    |  17 ++
 .../trident/spout/ICommitterTridentSpout.java   |  17 ++
 .../spout/IOpaquePartitionedTridentSpout.java   |  17 ++
 .../trident/spout/IPartitionedTridentSpout.java |  17 ++
 .../storm/trident/spout/ISpoutPartition.java    |  17 ++
 .../jvm/storm/trident/spout/ITridentSpout.java  |  17 ++
 .../OpaquePartitionedTridentSpoutExecutor.java  |  17 ++
 .../spout/PartitionedTridentSpoutExecutor.java  |  17 ++
 .../trident/spout/RichSpoutBatchExecutor.java   |  17 ++
 .../storm/trident/spout/RichSpoutBatchId.java   |  17 ++
 .../spout/RichSpoutBatchIdSerializer.java       |  17 ++
 .../trident/spout/RichSpoutBatchTriggerer.java  |  17 ++
 .../trident/spout/TridentSpoutCoordinator.java  |  17 ++
 .../trident/spout/TridentSpoutExecutor.java     |  17 ++
 .../storm/trident/state/BaseQueryFunction.java  |  17 ++
 .../storm/trident/state/BaseStateUpdater.java   |  17 ++
 .../trident/state/CombinerValueUpdater.java     |  17 ++
 .../storm/trident/state/ITupleCollection.java   |  17 ++
 .../state/JSONNonTransactionalSerializer.java   |  17 ++
 .../trident/state/JSONOpaqueSerializer.java     |  17 ++
 .../state/JSONTransactionalSerializer.java      |  17 ++
 .../jvm/storm/trident/state/OpaqueValue.java    |  17 ++
 .../jvm/storm/trident/state/QueryFunction.java  |  17 ++
 .../jvm/storm/trident/state/ReadOnlyState.java  |  17 ++
 .../trident/state/ReducerValueUpdater.java      |  17 ++
 .../src/jvm/storm/trident/state/Serializer.java |  17 ++
 .../src/jvm/storm/trident/state/State.java      |  17 ++
 .../jvm/storm/trident/state/StateFactory.java   |  17 ++
 .../src/jvm/storm/trident/state/StateSpec.java  |  17 ++
 .../src/jvm/storm/trident/state/StateType.java  |  17 ++
 .../jvm/storm/trident/state/StateUpdater.java   |  17 ++
 .../storm/trident/state/TransactionalValue.java |  17 ++
 .../jvm/storm/trident/state/ValueUpdater.java   |  17 ++
 .../trident/state/map/CachedBatchReadsMap.java  |  17 ++
 .../jvm/storm/trident/state/map/CachedMap.java  |  17 ++
 .../storm/trident/state/map/IBackingMap.java    |  17 ++
 .../state/map/MapCombinerAggStateUpdater.java   |  17 ++
 .../state/map/MapReducerAggStateUpdater.java    |  17 ++
 .../jvm/storm/trident/state/map/MapState.java   |  17 ++
 .../state/map/MicroBatchIBackingMap.java        |  17 ++
 .../trident/state/map/NonTransactionalMap.java  |  17 ++
 .../jvm/storm/trident/state/map/OpaqueMap.java  |  17 ++
 .../trident/state/map/ReadOnlyMapState.java     |  17 ++
 .../trident/state/map/SnapshottableMap.java     |  17 ++
 .../trident/state/map/TransactionalMap.java     |  17 ++
 .../state/snapshot/ReadOnlySnapshottable.java   |  17 ++
 .../trident/state/snapshot/Snapshottable.java   |  17 ++
 .../trident/testing/CountAsAggregator.java      |  17 ++
 .../storm/trident/testing/FeederBatchSpout.java |  17 ++
 .../testing/FeederCommitterBatchSpout.java      |  17 ++
 .../storm/trident/testing/FixedBatchSpout.java  |  17 ++
 .../src/jvm/storm/trident/testing/IFeeder.java  |  17 ++
 .../trident/testing/LRUMemoryMapState.java      |  17 ++
 .../storm/trident/testing/MemoryBackingMap.java |  17 ++
 .../storm/trident/testing/MemoryMapState.java   |  17 ++
 .../storm/trident/testing/MockTridentTuple.java |  17 ++
 .../src/jvm/storm/trident/testing/Split.java    |  17 ++
 .../jvm/storm/trident/testing/StringLength.java |  17 ++
 .../jvm/storm/trident/testing/TrueFilter.java   |  17 ++
 .../jvm/storm/trident/testing/TuplifyArgs.java  |  17 ++
 .../jvm/storm/trident/topology/BatchInfo.java   |  17 ++
 .../trident/topology/ITridentBatchBolt.java     |  17 ++
 .../topology/MasterBatchCoordinator.java        |  17 ++
 .../trident/topology/TransactionAttempt.java    |  17 ++
 .../trident/topology/TridentBoltExecutor.java   |  17 ++
 .../topology/TridentTopologyBuilder.java        |  17 ++
 .../state/RotatingTransactionalState.java       |  17 ++
 .../topology/state/TransactionalState.java      |  17 ++
 .../src/jvm/storm/trident/tuple/ComboList.java  |  17 ++
 .../src/jvm/storm/trident/tuple/ConsList.java   |  17 ++
 .../jvm/storm/trident/tuple/TridentTuple.java   |  17 ++
 .../storm/trident/tuple/TridentTupleView.java   |  17 ++
 .../jvm/storm/trident/tuple/ValuePointer.java   |  17 ++
 .../storm/trident/util/ErrorEdgeFactory.java    |  17 ++
 .../src/jvm/storm/trident/util/IndexedEdge.java |  17 ++
 .../src/jvm/storm/trident/util/LRUMap.java      |  17 ++
 .../jvm/storm/trident/util/TridentUtils.java    |  17 ++
 storm-core/src/multilang/py/storm.py            |  18 ++
 storm-core/src/multilang/rb/storm.rb            |  18 ++
 storm-core/src/storm.thrift                     |  23 ++
 storm-core/src/ui/public/css/style.css          |  17 ++
 storm-core/src/ui/public/js/script.js           |  17 ++
 .../test/clj/backtype/storm/clojure_test.clj    |  15 +
 .../test/clj/backtype/storm/cluster_test.clj    |  15 +
 .../test/clj/backtype/storm/config_test.clj     |  15 +
 .../test/clj/backtype/storm/drpc_test.clj       |  15 +
 .../test/clj/backtype/storm/fields_test.clj     |  15 +
 .../test/clj/backtype/storm/grouping_test.clj   |  15 +
 .../clj/backtype/storm/integration_test.clj     |  15 +
 .../clj/backtype/storm/local_state_test.clj     |  15 +
 .../storm/messaging/netty_integration_test.clj  |  15 +
 .../storm/messaging/netty_unit_test.clj         |  15 +
 .../test/clj/backtype/storm/messaging_test.clj  |  15 +
 .../test/clj/backtype/storm/metrics_test.clj    |  15 +
 .../test/clj/backtype/storm/multilang_test.clj  |  15 +
 .../test/clj/backtype/storm/nimbus_test.clj     |  15 +
 .../test/clj/backtype/storm/scheduler_test.clj  |  15 +
 .../storm/security/auth/AuthUtils_test.clj      |  15 +
 .../storm/security/auth/ReqContext_test.clj     |  15 +
 .../security/auth/SaslTransportPlugin_test.clj  |  15 +
 .../storm/security/auth/ThriftClient_test.clj   |  15 +
 .../storm/security/auth/ThriftServer_test.clj   |  15 +
 .../backtype/storm/security/auth/auth_test.clj  |  15 +
 .../BlowfishTupleSerializer_test.clj            |  15 +
 .../serialization/SerializationFactory_test.clj |  15 +
 .../clj/backtype/storm/serialization_test.clj   |  15 +
 .../clj/backtype/storm/subtopology_test.clj     |  15 +
 .../test/clj/backtype/storm/supervisor_test.clj |  15 +
 .../test/clj/backtype/storm/testing4j_test.clj  |  15 +
 .../test/clj/backtype/storm/tick_tuple_test.clj |  15 +
 .../clj/backtype/storm/transactional_test.clj   |  15 +
 .../test/clj/backtype/storm/tuple_test.clj      |  15 +
 .../test/clj/backtype/storm/utils_test.clj      |  15 +
 .../clj/backtype/storm/versioned_store_test.clj |  15 +
 .../test/clj/storm/trident/integration_test.clj |  15 +
 .../test/clj/storm/trident/state_test.clj       |  15 +
 .../test/clj/storm/trident/tuple_test.clj       |  15 +
 storm-lib/project.clj                           |  15 +
 558 files changed, 9974 insertions(+), 695 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/project.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index c2b391a,0000000..d765e71
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@@ -1,204 -1,0 +1,221 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.Config;
 +import backtype.storm.messaging.IConnection;
 +import backtype.storm.messaging.TaskMessage;
 +import backtype.storm.utils.Utils;
 +import org.jboss.netty.bootstrap.ClientBootstrap;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelFactory;
 +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.net.InetSocketAddress;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +class Client implements IConnection {
 +    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
 +    private final int max_retries;
 +    private final int base_sleep_ms;
 +    private final int max_sleep_ms;
 +    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
 +    private AtomicReference<Channel> channelRef;
 +    private final ClientBootstrap bootstrap;
 +    private InetSocketAddress remote_addr;
 +    private AtomicInteger retries;
 +    private final Random random = new Random();
 +    private final ChannelFactory factory;
 +    private final int buffer_size;
 +    private final AtomicBoolean being_closed;
 +
 +    @SuppressWarnings("rawtypes")
 +    Client(Map storm_conf, String host, int port) {
 +        message_queue = new LinkedBlockingQueue<Object>();
 +        retries = new AtomicInteger(0);
 +        channelRef = new AtomicReference<Channel>(null);
 +        being_closed = new AtomicBoolean(false);
 +
 +        // Configure
 +        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
 +        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
 +        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
 +        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 +        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
 +
 +        if (maxWorkers > 0) {
 +            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
 +        } else {
 +            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
 +        }
 +        bootstrap = new ClientBootstrap(factory);
 +        bootstrap.setOption("tcpNoDelay", true);
 +        bootstrap.setOption("sendBufferSize", buffer_size);
 +        bootstrap.setOption("keepAlive", true);
 +
 +        // Set up the pipeline factory.
 +        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
 +
 +        // Start the connection attempt.
 +        remote_addr = new InetSocketAddress(host, port);
 +        bootstrap.connect(remote_addr);
 +    }
 +
 +    /**
 +     * We will retry connection with exponential back-off policy
 +     */
 +    void reconnect() {
 +        try {
 +            int tried_count = retries.incrementAndGet();
 +            if (tried_count <= max_retries) {
 +                Thread.sleep(getSleepTimeMs());
 +                LOG.info("Reconnect ... [{}]", tried_count);
 +                bootstrap.connect(remote_addr);
 +                LOG.debug("connection started...");
 +            } else {
 +                LOG.warn("Remote address is not reachable. We will close this client.");
 +                close();
 +            }
 +        } catch (InterruptedException e) {
 +            LOG.warn("connection failed", e);
 +        }
 +    }
 +
 +    /**
 +     * # of milliseconds to wait per exponential back-off policy
 +     */
 +    private int getSleepTimeMs()
 +    {
 +        int backoff = 1 << retries.get();
 +        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
 +        if ( sleepMs > max_sleep_ms )
 +            sleepMs = max_sleep_ms;
 +        return sleepMs;
 +    }
 +
 +    /**
 +     * Enqueue a task message to be sent to server
 +     */
 +    public void send(int task, byte[] message) {
 +        //throw exception if the client is being closed
 +        if (being_closed.get()) {
 +            throw new RuntimeException("Client is being closed, and does not take requests any more");
 +        }
 +
 +        try {
 +            message_queue.put(new TaskMessage(task, message));
 +        } catch (InterruptedException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Take all enqueued messages from queue
 +     * @return
 +     * @throws InterruptedException
 +     */
 +    MessageBatch takeMessages()  throws InterruptedException {
 +        //1st message
 +        MessageBatch batch = new MessageBatch(buffer_size);
 +        Object msg = message_queue.take();
 +        batch.add(msg);
 +
 +        //we will discard any message after CLOSE
 +        if (msg==ControlMessage.CLOSE_MESSAGE)
 +            return batch;
 +
 +        while (!batch.isFull()) {
 +            //peek the next message
 +            msg = message_queue.peek();
 +            //no more messages
 +            if (msg == null) break;
 +
 +            //we will discard any message after CLOSE
 +            if (msg==ControlMessage.CLOSE_MESSAGE) {
 +                message_queue.take();
 +                batch.add(msg);
 +                break;
 +            }
 +
 +            //try to add this msg into batch
 +            if (!batch.tryAdd((TaskMessage) msg))
 +                break;
 +
 +            //remove this message
 +            message_queue.take();
 +        }
 +
 +        return batch;
 +    }
 +
 +    /**
 +     * gracefully close this client.
 +     *
 +     * We will send all existing requests, and then invoke close_n_release() method
 +     */
 +    public synchronized void close() {
 +        if (!being_closed.get()) {
 +            //enqueue a CLOSE message so that shutdown() will be invoked
 +            try {
 +                message_queue.put(ControlMessage.CLOSE_MESSAGE);
 +                being_closed.set(true);
 +            } catch (InterruptedException e) {
 +                close_n_release();
 +            }
 +        }
 +    }
 +
 +    /**
 +     * close_n_release() is invoked after all messages have been sent.
 +     */
 +    void  close_n_release() {
 +        if (channelRef.get() != null)
 +            channelRef.get().close().awaitUninterruptibly();
 +
 +        //we need to release resources
 +        new Thread(new Runnable() {
 +            @Override
 +            public void run() {
 +                factory.releaseExternalResources();
 +            }}).start();
 +    }
 +
 +    public TaskMessage recv(int flags) {
 +        throw new RuntimeException("Client connection should not receive any messages");
 +    }
 +
 +    void setChannel(Channel channel) {
 +        channelRef.set(channel);
 +        //reset retries
 +        if (channel != null)
 +            retries.set(0);
 +    }
 +
 +}
 +
 +
 +
 +

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 018e0f9,0000000..3e09dd1
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.IConnection;
 +import backtype.storm.messaging.IContext;
 +
 +import java.util.Map;
 +import java.util.Vector;
 +
 +public class Context implements IContext {
 +    @SuppressWarnings("rawtypes")
 +    private Map storm_conf;
 +    private volatile Vector<IConnection> connections;
 +    
 +    /**
 +     * initialization per Storm configuration 
 +     */
 +    @SuppressWarnings("rawtypes")
 +    public void prepare(Map storm_conf) {
 +       this.storm_conf = storm_conf;
 +       connections = new Vector<IConnection>(); 
 +    }
 +
 +    /**
 +     * establish a server with a binding port
 +     */
 +    public IConnection bind(String storm_id, int port) {
 +        IConnection server = new Server(storm_conf, port);
 +        connections.add(server);
 +        return server;
 +    }
 +
 +    /**
 +     * establish a connection to a remote server
 +     */
 +    public IConnection connect(String storm_id, String host, int port) {        
 +        IConnection client =  new Client(storm_conf, host, port);
 +        connections.add(client);
 +        return client;
 +    }
 +
 +    /**
 +     * terminate this context
 +     */
 +    public void term() {
 +        for (IConnection conn : connections) {
 +            conn.close();
 +        }
 +        connections = null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index 4cc2040,0000000..a552cf7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.buffer.ChannelBuffer;
 +import org.jboss.netty.buffer.ChannelBufferOutputStream;
 +import org.jboss.netty.buffer.ChannelBuffers;
 +
 +enum ControlMessage {
 +    CLOSE_MESSAGE((short)-100),
 +    EOB_MESSAGE((short)-201),
 +    OK_RESPONSE((short)-200),
 +    FAILURE_RESPONSE((short)-400);
 +
 +    private short code;
 +
 +    //private constructor
 +    private ControlMessage(short code) {
 +        this.code = code;
 +    }
 +
 +    /**
 +     * Return a control message per an encoded status code
 +     * @param encoded
 +     * @return
 +     */
 +    static ControlMessage mkMessage(short encoded) {
 +        for(ControlMessage cm: ControlMessage.values()) {
 +          if(encoded == cm.code) return cm;
 +        }
 +        return null;
 +    }
 +
 +    int encodeLength() {
 +        return 2; //short
 +    }
 +    
 +    /**
 +     * encode the current Control Message into a channel buffer
 +     * @throws Exception
 +     */
 +    ChannelBuffer buffer() throws Exception {
 +        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
 +        write(bout);
 +        bout.close();
 +        return bout.buffer();
 +    }
 +
 +    void write(ChannelBufferOutputStream bout) throws Exception {
 +        bout.writeShort(code);        
 +    } 
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index a9d46a2,0000000..9d287e4
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@@ -1,151 -1,0 +1,168 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.TaskMessage;
 +import org.jboss.netty.buffer.ChannelBuffer;
 +import org.jboss.netty.buffer.ChannelBufferOutputStream;
 +import org.jboss.netty.buffer.ChannelBuffers;
 +
 +import java.util.ArrayList;
 +
 +class MessageBatch {
 +    private int buffer_size;
 +    private ArrayList<Object> msgs;
 +    private int encoded_length;
 +
 +    MessageBatch(int buffer_size) {
 +        this.buffer_size = buffer_size;
 +        msgs = new ArrayList<Object>();
 +        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
 +    }
 +
 +    void add(Object obj) {
 +        if (obj == null)
 +            throw new RuntimeException("null object forbidded in message batch");
 +
 +        if (obj instanceof TaskMessage) {
 +            TaskMessage msg = (TaskMessage)obj;
 +            msgs.add(msg);
 +            encoded_length += msgEncodeLength(msg);
 +            return;
 +        }
 +
 +        if (obj instanceof ControlMessage) {
 +            ControlMessage msg = (ControlMessage)obj;
 +            msgs.add(msg);
 +            encoded_length += msg.encodeLength();
 +            return;
 +        }
 +
 +        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
 +    }
 +
 +    void remove(Object obj) {
 +        if (obj == null) return;
 +
 +        if (obj instanceof TaskMessage) {
 +            TaskMessage msg = (TaskMessage)obj;
 +            msgs.remove(msg);
 +            encoded_length -= msgEncodeLength(msg);
 +            return;
 +        }
 +
 +        if (obj instanceof ControlMessage) {
 +            ControlMessage msg = (ControlMessage)obj;
 +            msgs.remove(msg);
 +            encoded_length -= msg.encodeLength();
 +            return;
 +        }
 +    }
 +
 +    Object get(int index) {
 +        return msgs.get(index);
 +    }
 +
 +    /**
 +     * try to add a TaskMessage to a batch
 +     * @param taskMsg
 +     * @return false if the msg could not be added due to buffer size limit; true otherwise
 +     */
 +    boolean tryAdd(TaskMessage taskMsg) {
 +        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
 +            return false;
 +        add(taskMsg);
 +        return true;
 +    }
 +
 +    private int msgEncodeLength(TaskMessage taskMsg) {
 +        if (taskMsg == null) return 0;
 +
 +        int size = 6; //INT + SHORT
 +        if (taskMsg.message() != null) 
 +            size += taskMsg.message().length;
 +        return size;
 +    }
 +
 +    /**
 +     * Has this batch used up allowed buffer size
 +     * @return
 +     */
 +    boolean isFull() {
 +        return encoded_length >= buffer_size;
 +    }
 +
 +    /**
 +     * true if this batch doesn't have any messages 
 +     * @return
 +     */
 +    boolean isEmpty() {
 +        return msgs.isEmpty();
 +    }
 +
 +    /**
 +     * # of msgs in this batch
 +     * @return
 +     */
 +    int size() {
 +        return msgs.size();
 +    }
 +
 +    /**
 +     * create a buffer containing the encoding of this batch
 +     */
 +    ChannelBuffer buffer() throws Exception {
 +        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
 +        
 +        for (Object msg : msgs) 
 +            if (msg instanceof TaskMessage)
 +                writeTaskMessage(bout, (TaskMessage)msg);
 +            else
 +                ((ControlMessage)msg).write(bout);
 +        
 +        //add a END_OF_BATCH indicator
 +        ControlMessage.EOB_MESSAGE.write(bout);
 +
 +        bout.close();
 +
 +        return bout.buffer();
 +    }
 +
 +    /**
 +     * write a TaskMessage into a stream
 +     *
 +     * Each TaskMessage is encoded as:
 +     *  task ... short(2)
 +     *  len ... int(4)
 +     *  payload ... byte[]     *  
 +     */
 +    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
 +        int payload_len = 0;
 +        if (message.message() != null)
 +            payload_len =  message.message().length;
 +
 +        int task_id = message.task();
 +        if (task_id > Short.MAX_VALUE)
 +            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
 +        
 +        bout.writeShort((short)task_id);
 +        bout.writeInt(payload_len);
 +        if (payload_len >0)
 +            bout.write(message.message());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 76776a9,0000000..3365e58
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@@ -1,68 -1,0 +1,85 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.TaskMessage;
 +import org.jboss.netty.buffer.ChannelBuffer;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelHandlerContext;
 +import org.jboss.netty.handler.codec.frame.FrameDecoder;
 +
 +public class MessageDecoder extends FrameDecoder {    
 +    /*
 +     * Each ControlMessage is encoded as:
 +     *  code (<0) ... short(2)
 +     * Each TaskMessage is encoded as:
 +     *  task (>=0) ... short(2)
 +     *  len ... int(4)
 +     *  payload ... byte[]     *  
 +     */
 +    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
 +        // Make sure that we have received at least a short 
 +        if (buf.readableBytes() < 2) {
 +            //need more data
 +            return null;
 +        }
 +
 +        // Mark the current buffer position before reading task/len field
 +        // because the whole frame might not be in the buffer yet.
 +        // We will reset the buffer position to the marked position if
 +        // there's not enough bytes in the buffer.
 +        buf.markReaderIndex();
 +
 +        //read the short field
 +        short code = buf.readShort();
 +        
 +        //case 1: Control message
 +        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
 +        if (ctrl_msg != null) return ctrl_msg;
 +        
 +        //case 2: task Message
 +        short task = code;
 +        
 +        // Make sure that we have received at least an integer (length) 
 +        if (buf.readableBytes() < 4) {
 +            //need more data
 +            buf.resetReaderIndex();
 +            return null;
 +        }
 +
 +        // Read the length field.
 +        int length = buf.readInt();
 +        if (length<=0) {
 +            return new TaskMessage(task, null);
 +        }
 +        
 +        // Make sure if there's enough bytes in the buffer.
 +        if (buf.readableBytes() < length) {
 +            // The whole bytes were not received yet - return null.
 +            buf.resetReaderIndex();
 +            return null;
 +        }
 +
 +        // There's enough bytes in the buffer. Read it.
 +        ChannelBuffer payload = buf.readBytes(length);
 +
 +        // Successfully decoded a frame.
 +        // Return a TaskMessage object
 +        return new TaskMessage(task,payload.array());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
index c0ac8f1,0000000..e6e65c3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@@ -1,22 -1,0 +1,39 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelHandlerContext;
 +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 +
 +public class MessageEncoder extends OneToOneEncoder {    
 +    @Override
 +    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
 +        if (obj instanceof ControlMessage) {
 +            return ((ControlMessage)obj).buffer();
 +        }
 +
 +        if (obj instanceof MessageBatch) {
 +            return ((MessageBatch)obj).buffer();
 +        } 
 +        
 +        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index bf6825c,0000000..ad811b0
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@@ -1,119 -1,0 +1,136 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.Config;
 +import backtype.storm.messaging.IConnection;
 +import backtype.storm.messaging.TaskMessage;
 +import backtype.storm.utils.Utils;
 +import org.jboss.netty.bootstrap.ServerBootstrap;
 +import org.jboss.netty.channel.Channel;
 +import org.jboss.netty.channel.ChannelFactory;
 +import org.jboss.netty.channel.group.ChannelGroup;
 +import org.jboss.netty.channel.group.DefaultChannelGroup;
 +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.net.InetSocketAddress;
 +import java.util.Map;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +class Server implements IConnection {
 +    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
 +    @SuppressWarnings("rawtypes")
 +    Map storm_conf;
 +    int port;
 +    private LinkedBlockingQueue<TaskMessage> message_queue;
 +    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
 +    final ChannelFactory factory;
 +    final ServerBootstrap bootstrap;
 +
 +    @SuppressWarnings("rawtypes")
 +    Server(Map storm_conf, int port) {
 +        this.storm_conf = storm_conf;
 +        this.port = port;
 +        message_queue = new LinkedBlockingQueue<TaskMessage>();
 +
 +        // Configure the server.
 +        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
 +        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
 +
 +        if (maxWorkers > 0) {
 +            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
 +        } else {
 +            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
 +        }
 +        bootstrap = new ServerBootstrap(factory);
 +        bootstrap.setOption("child.tcpNoDelay", true);
 +        bootstrap.setOption("child.receiveBufferSize", buffer_size);
 +        bootstrap.setOption("child.keepAlive", true);
 +
 +        // Set up the pipeline factory.
 +        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
 +
 +        // Bind and start to accept incoming connections.
 +        Channel channel = bootstrap.bind(new InetSocketAddress(port));
 +        allChannels.add(channel);
 +    }
 +
 +    /**
 +     * enqueue a received message 
 +     * @param message
 +     * @throws InterruptedException
 +     */
 +    protected void enqueue(TaskMessage message) throws InterruptedException {
 +        message_queue.put(message);
 +        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
 +    }
 +    
 +    /**
 +     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
 +     */
 +    public TaskMessage recv(int flags)  {
 +        if ((flags & 0x01) == 0x01) { 
 +            //non-blocking
 +            return message_queue.poll();
 +        } else {
 +            try {
 +                TaskMessage request = message_queue.take();
 +                LOG.debug("request to be processed: {}", request);
 +                return request;
 +            } catch (InterruptedException e) {
 +                LOG.info("exception within msg receiving", e);
 +                return null;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * register a newly created channel
 +     * @param channel
 +     */
 +    protected void addChannel(Channel channel) {
 +        allChannels.add(channel);
 +    }
 +    
 +    /**
 +     * close a channel
 +     * @param channel
 +     */
 +    protected void closeChannel(Channel channel) {
 +        channel.close().awaitUninterruptibly();
 +        allChannels.remove(channel);
 +    }
 +
 +    /**
 +     * close all channels, and release resources
 +     */
 +    public synchronized void close() {
 +        if (allChannels != null) {  
 +            allChannels.close().awaitUninterruptibly();
 +            factory.releaseExternalResources();
 +            allChannels = null;
 +        }
 +    }
 +
 +    public void send(int task, byte[] message) {
 +        throw new RuntimeException("Server connection should not send any messages");
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 6fbfb1c,0000000..65c36a7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@@ -1,104 -1,0 +1,121 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.net.ConnectException;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +public class StormClientHandler extends SimpleChannelUpstreamHandler  {
 +    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
 +    private Client client;
 +    private AtomicBoolean being_closed;
 +    long start_time; 
 +    
 +    StormClientHandler(Client client) {
 +        this.client = client;
 +        being_closed = new AtomicBoolean(false);
 +        start_time = System.currentTimeMillis();
 +    }
 +
 +    @Override
 +    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
 +        //register the newly established channel
 +        Channel channel = event.getChannel();
 +        client.setChannel(channel);
 +        LOG.debug("connection established to a remote host");
 +        
 +        //send next request
 +        try {
 +            sendRequests(channel, client.takeMessages());
 +        } catch (InterruptedException e) {
 +            channel.close();
 +        }
 +    }
 +
 +    @Override
 +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
 +        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
 +        
 +        //examine the response message from server
 +        ControlMessage msg = (ControlMessage)event.getMessage();
 +        if (msg==ControlMessage.FAILURE_RESPONSE)
 +            LOG.info("failure response:{}", msg);
 +
 +        //send next request
 +        Channel channel = event.getChannel();
 +        try {
 +            sendRequests(channel, client.takeMessages());
 +        } catch (InterruptedException e) {
 +            channel.close();
 +        }
 +    }
 +
 +    /**
 +     * Retrieve a request from message queue, and send to server
 +     * @param channel
 +     */
 +    private void sendRequests(Channel channel, final MessageBatch requests) {
 +        if (requests==null || requests.size()==0 || being_closed.get()) return;
 +
 +        //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
 +        Object last_msg = requests.get(requests.size()-1);
 +        if (last_msg==ControlMessage.CLOSE_MESSAGE) {
 +            being_closed.set(true);
 +            requests.remove(last_msg);
 +        }
 +
 +        //we may don't need do anything if no requests found
 +        if (requests.isEmpty()) {
 +            if (being_closed.get())
 +                client.close_n_release();
 +            return;
 +        }
 +
 +        //write request into socket channel
 +        ChannelFuture future = channel.write(requests);
 +        future.addListener(new ChannelFutureListener() {
 +            public void operationComplete(ChannelFuture future)
 +                    throws Exception {
 +                if (!future.isSuccess()) {
 +                    LOG.info("failed to send requests:", future.getCause());
 +                    future.getChannel().close();
 +                } else {
 +                    LOG.debug("{} request(s) sent", requests.size());
 +                }
 +                if (being_closed.get())
 +                    client.close_n_release();
 +            }
 +        });
 +    }
 +
 +    @Override
 +    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
 +        Throwable cause = event.getCause();
 +        if (!(cause instanceof ConnectException)) {
 +            LOG.info("Connection failed:", cause);
 +        } 
 +        if (!being_closed.get()) {
 +            client.setChannel(null);
 +            client.reconnect();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 91c513a,0000000..6bad8e3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@@ -1,27 -1,0 +1,44 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.ChannelPipeline;
 +import org.jboss.netty.channel.ChannelPipelineFactory;
 +import org.jboss.netty.channel.Channels;
 +
 +class StormClientPipelineFactory implements ChannelPipelineFactory {
 +    private Client client;
 +
 +    StormClientPipelineFactory(Client client) {
 +        this.client = client;        
 +    }
 +
 +    public ChannelPipeline getPipeline() throws Exception {
 +        // Create a default pipeline implementation.
 +        ChannelPipeline pipeline = Channels.pipeline();
 +
 +        // Decoder
 +        pipeline.addLast("decoder", new MessageDecoder());
 +        // Encoder
 +        pipeline.addLast("encoder", new MessageEncoder());
 +        // business logic.
 +        pipeline.addLast("handler", new StormClientHandler(client));
 +
 +        return pipeline;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 9a5aaed,0000000..093fb61
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@@ -1,53 -1,0 +1,70 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import backtype.storm.messaging.TaskMessage;
 +import org.jboss.netty.channel.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +class StormServerHandler extends SimpleChannelUpstreamHandler  {
 +    private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
 +    Server server;
 +    private AtomicInteger failure_count; 
 +    
 +    StormServerHandler(Server server) {
 +        this.server = server;
 +        failure_count = new AtomicInteger(0);
 +    }
 +    
 +    @Override
 +    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
 +        server.addChannel(e.getChannel());
 +    }
 +    
 +    @Override
 +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
 +        Object msg = e.getMessage();  
 +        if (msg == null) return;
 +
 +        //end of batch?
 +        if (msg==ControlMessage.EOB_MESSAGE) {
 +            Channel channel = ctx.getChannel();
 +            LOG.debug("Send back response ...");
 +            if (failure_count.get()==0)
 +                channel.write(ControlMessage.OK_RESPONSE);
 +            else channel.write(ControlMessage.FAILURE_RESPONSE);
 +            return;
 +        }
 +        
 +        //enqueue the received message for processing
 +        try {
 +            server.enqueue((TaskMessage)msg);
 +        } catch (InterruptedException e1) {
 +            LOG.info("failed to enqueue a request message", e);
 +            failure_count.incrementAndGet();
 +        }
 +    }
 +
 +    @Override
 +    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
 +        server.closeChannel(e.getChannel());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index 56b0834,0000000..df29ba8
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@@ -1,28 -1,0 +1,45 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package backtype.storm.messaging.netty;
 +
 +import org.jboss.netty.channel.ChannelPipeline;
 +import org.jboss.netty.channel.ChannelPipelineFactory;
 +import org.jboss.netty.channel.Channels;
 +
 +
 +class StormServerPipelineFactory implements  ChannelPipelineFactory {
 +    private Server server;
 +    
 +    StormServerPipelineFactory(Server server) {
 +        this.server = server;        
 +    }
 +    
 +    public ChannelPipeline getPipeline() throws Exception {
 +        // Create a default pipeline implementation.
 +        ChannelPipeline pipeline = Channels.pipeline();
 +
 +        // Decoder
 +        pipeline.addLast("decoder", new MessageDecoder());
 +        // Encoder
 +        pipeline.addLast("encoder", new MessageEncoder());
 +        // business logic.
 +        pipeline.addLast("handler", new StormServerHandler(server));
 +
 +        return pipeline;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index eefcb48,0000000..0c908c5
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@@ -1,44 -1,0 +1,59 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
 +(ns backtype.storm.messaging.netty-integration-test
 +  (:use [clojure test])
 +  (:import [backtype.storm.messaging TransportFactory])
 +  (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
 +  (:use [backtype.storm bootstrap testing util]))
 +
 +(bootstrap)
 +
 +(deftest test-integration
 +  (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
 +                                      :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
 +                                                    STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
 +                                                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
 +                                                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                                                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                                                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                                                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                                                    }]
 +    (let [topology (thrift/mk-topology
 +                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
 +                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
 +                                               :parallelism-hint 6)})
 +          results (complete-topology cluster
 +                                     topology
 +                                     ;; important for test that
 +                                     ;; #tuples = multiple of 4 and 6
 +                                     :storm-conf {TOPOLOGY-WORKERS 3}
 +                                     :mock-sources {"1" [["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ["a"] ["b"]
 +                                                         ]}
 +                                     )]
 +      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
 +               (read-tuples results "2"))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 12ebe5d,0000000..20914ef
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -1,97 -1,0 +1,112 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements.  See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership.  The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License.  You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
 +(ns backtype.storm.messaging.netty-unit-test
 +  (:use [clojure test])
 +  (:import [backtype.storm.messaging TransportFactory])
 +  (:use [backtype.storm bootstrap testing util]))
 +
 +(bootstrap)
 +
 +(def port 6700) 
 +(def task 1) 
 +
 +(deftest test-basic
 +  (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
 +        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        server (.bind context nil port)
 +        client (.connect context nil "localhost" port)
 +        _ (.send client task (.getBytes req_msg))
 +        resp (.recv server 0)]
 +    (is (= task (.task resp)))
 +    (is (= req_msg (String. (.message resp))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))    
 +
 +(deftest test-large-msg
 +  (let [req_msg (apply str (repeat 2048000 'c')) 
 +        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        server (.bind context nil port)
 +        client (.connect context nil "localhost" port)
 +        _ (.send client task (.getBytes req_msg))
 +        resp (.recv server 0)]
 +    (is (= task (.task resp)))
 +    (is (= req_msg (String. (.message resp))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))    
 +    
 +(deftest test-server-delayed
 +    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
 +       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        client (.connect context nil "localhost" port)
 +        _ (.send client task (.getBytes req_msg))
 +        _ (Thread/sleep 1000)
 +        server (.bind context nil port)
 +        resp (.recv server 0)]
 +    (is (= task (.task resp)))
 +    (is (= req_msg (String. (.message resp))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))    
 +
 +(deftest test-batch
 +  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
 +                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
 +                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
 +                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
 +                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
 +                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
 +                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
 +                    }
 +        context (TransportFactory/makeContext storm-conf)
 +        server (.bind context nil port)
 +        client (.connect context nil "localhost" port)]
 +    (doseq [num  (range 1 100000)]
 +      (let [req_msg (str num)]
 +        (.send client task (.getBytes req_msg))))
 +    (doseq [num  (range 1 100000)]
 +      (let [req_msg (str num)
 +            resp (.recv server 0)
 +            resp_msg (String. (.message resp))]
 +        (is (= req_msg resp_msg))))
 +    (.close client)
 +    (.close server)
 +    (.term context)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------