You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/21 03:26:22 UTC
[3/4] git commit: Merge branch 'master' into netty-default
Merge branch 'master' into netty-default
Conflicts:
bin/install_zmq.sh
storm-core/src/clj/backtype/storm/messaging/zmq.clj
storm-core/src/clj/zilch/mq.clj
storm-core/test/clj/zilch/test/mq.clj
storm-netty/project.clj
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/962d5207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/962d5207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/962d5207
Branch: refs/heads/master
Commit: 962d520777368709cf30d6214719bfa1b81bbd4c
Parents: b63ed13 7e40f9f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Dec 16 13:21:19 2013 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Dec 16 13:21:19 2013 -0500
----------------------------------------------------------------------
LICENSE | 285 +++++++++++++++++++
LICENSE.html | 261 -----------------
NOTICE | 12 +-
README.markdown | 23 +-
VERSION | 2 +-
bin/build_modules.sh | 16 ++
bin/build_release.sh | 17 ++
bin/javadoc.sh | 16 ++
bin/storm | 16 ++
bin/to_maven.sh | 17 ++
conf/defaults.yaml | 18 ++
conf/jaas_digest.conf | 18 ++
conf/logback.xml | 17 +-
conf/storm.yaml.example | 16 ++
logback/cluster.xml | 18 +-
project.clj | 15 +
storm-console-logging/project.clj | 15 +
storm-core/project.clj | 21 +-
.../src/clj/backtype/storm/LocalCluster.clj | 15 +
storm-core/src/clj/backtype/storm/LocalDRPC.clj | 15 +
storm-core/src/clj/backtype/storm/bootstrap.clj | 15 +
storm-core/src/clj/backtype/storm/clojure.clj | 15 +
storm-core/src/clj/backtype/storm/cluster.clj | 15 +
.../src/clj/backtype/storm/command/activate.clj | 15 +
.../clj/backtype/storm/command/config_value.clj | 15 +
.../clj/backtype/storm/command/deactivate.clj | 15 +
.../backtype/storm/command/dev_zookeeper.clj | 15 +
.../backtype/storm/command/kill_topology.clj | 15 +
.../src/clj/backtype/storm/command/list.clj | 15 +
.../clj/backtype/storm/command/rebalance.clj | 15 +
.../backtype/storm/command/shell_submission.clj | 15 +
storm-core/src/clj/backtype/storm/config.clj | 15 +
.../src/clj/backtype/storm/daemon/acker.clj | 15 +
.../backtype/storm/daemon/builtin_metrics.clj | 15 +
.../src/clj/backtype/storm/daemon/common.clj | 15 +
.../src/clj/backtype/storm/daemon/drpc.clj | 19 +-
.../src/clj/backtype/storm/daemon/executor.clj | 15 +
.../src/clj/backtype/storm/daemon/logviewer.clj | 15 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 15 +
.../clj/backtype/storm/daemon/supervisor.clj | 15 +
.../src/clj/backtype/storm/daemon/task.clj | 15 +
.../src/clj/backtype/storm/daemon/worker.clj | 15 +
storm-core/src/clj/backtype/storm/disruptor.clj | 15 +
storm-core/src/clj/backtype/storm/event.clj | 15 +
storm-core/src/clj/backtype/storm/log.clj | 15 +
.../src/clj/backtype/storm/messaging/loader.clj | 15 +
.../src/clj/backtype/storm/messaging/local.clj | 15 +
.../src/clj/backtype/storm/metric/testing.clj | 15 +
.../clj/backtype/storm/process_simulator.clj | 15 +
.../storm/scheduler/DefaultScheduler.clj | 15 +
.../backtype/storm/scheduler/EvenScheduler.clj | 15 +
.../storm/scheduler/IsolationScheduler.clj | 15 +
storm-core/src/clj/backtype/storm/stats.clj | 15 +
storm-core/src/clj/backtype/storm/testing.clj | 15 +
storm-core/src/clj/backtype/storm/testing4j.clj | 15 +
storm-core/src/clj/backtype/storm/thrift.clj | 15 +
storm-core/src/clj/backtype/storm/timer.clj | 15 +
storm-core/src/clj/backtype/storm/tuple.clj | 15 +
storm-core/src/clj/backtype/storm/ui/core.clj | 15 +
.../src/clj/backtype/storm/ui/helpers.clj | 15 +
storm-core/src/clj/backtype/storm/util.clj | 15 +
storm-core/src/clj/backtype/storm/zookeeper.clj | 15 +
storm-core/src/clj/storm/trident/testing.clj | 15 +
storm-core/src/dev/resources/tester_bolt.py | 18 ++
storm-core/src/dev/resources/tester_bolt.rb | 18 ++
storm-core/src/dev/resources/tester_spout.py | 18 ++
storm-core/src/dev/resources/tester_spout.rb | 17 ++
storm-core/src/genthrift.sh | 16 ++
storm-core/src/jvm/backtype/storm/Config.java | 17 ++
.../jvm/backtype/storm/ConfigValidation.java | 17 ++
.../src/jvm/backtype/storm/Constants.java | 17 ++
.../src/jvm/backtype/storm/ILocalCluster.java | 17 ++
.../src/jvm/backtype/storm/ILocalDRPC.java | 17 ++
.../src/jvm/backtype/storm/StormSubmitter.java | 17 ++
.../jvm/backtype/storm/clojure/ClojureBolt.java | 17 ++
.../backtype/storm/clojure/ClojureSpout.java | 17 ++
.../backtype/storm/clojure/RichShellBolt.java | 17 ++
.../backtype/storm/clojure/RichShellSpout.java | 17 ++
.../storm/coordination/BatchBoltExecutor.java | 17 ++
.../coordination/BatchOutputCollector.java | 17 ++
.../coordination/BatchOutputCollectorImpl.java | 17 ++
.../coordination/BatchSubtopologyBuilder.java | 17 ++
.../storm/coordination/CoordinatedBolt.java | 17 ++
.../backtype/storm/coordination/IBatchBolt.java | 17 ++
.../jvm/backtype/storm/daemon/Shutdownable.java | 17 ++
.../storm/drpc/DRPCInvocationsClient.java | 17 ++
.../src/jvm/backtype/storm/drpc/DRPCSpout.java | 17 ++
.../src/jvm/backtype/storm/drpc/JoinResult.java | 17 ++
.../jvm/backtype/storm/drpc/KeyedFairBolt.java | 17 ++
.../storm/drpc/LinearDRPCInputDeclarer.java | 17 ++
.../storm/drpc/LinearDRPCTopologyBuilder.java | 17 ++
.../jvm/backtype/storm/drpc/PrepareRequest.java | 17 ++
.../jvm/backtype/storm/drpc/ReturnResults.java | 17 ++
.../storm/generated/AlreadyAliveException.java | 17 ++
.../src/jvm/backtype/storm/generated/Bolt.java | 17 ++
.../jvm/backtype/storm/generated/BoltStats.java | 17 ++
.../storm/generated/ClusterSummary.java | 17 ++
.../storm/generated/ComponentCommon.java | 17 ++
.../storm/generated/ComponentObject.java | 17 ++
.../storm/generated/DRPCExecutionException.java | 17 ++
.../backtype/storm/generated/DRPCRequest.java | 17 ++
.../storm/generated/DistributedRPC.java | 17 ++
.../generated/DistributedRPCInvocations.java | 17 ++
.../jvm/backtype/storm/generated/ErrorInfo.java | 17 ++
.../backtype/storm/generated/ExecutorInfo.java | 17 ++
.../storm/generated/ExecutorSpecificStats.java | 17 ++
.../backtype/storm/generated/ExecutorStats.java | 17 ++
.../storm/generated/ExecutorSummary.java | 17 ++
.../storm/generated/GlobalStreamId.java | 17 ++
.../jvm/backtype/storm/generated/Grouping.java | 17 ++
.../generated/InvalidTopologyException.java | 17 ++
.../backtype/storm/generated/JavaObject.java | 17 ++
.../backtype/storm/generated/JavaObjectArg.java | 17 ++
.../backtype/storm/generated/KillOptions.java | 17 ++
.../jvm/backtype/storm/generated/Nimbus.java | 17 ++
.../storm/generated/NotAliveException.java | 17 ++
.../backtype/storm/generated/NullStruct.java | 17 ++
.../storm/generated/RebalanceOptions.java | 17 ++
.../storm/generated/ShellComponent.java | 17 ++
.../jvm/backtype/storm/generated/SpoutSpec.java | 17 ++
.../backtype/storm/generated/SpoutStats.java | 17 ++
.../storm/generated/StateSpoutSpec.java | 17 ++
.../backtype/storm/generated/StormTopology.java | 17 ++
.../backtype/storm/generated/StreamInfo.java | 17 ++
.../backtype/storm/generated/SubmitOptions.java | 17 ++
.../storm/generated/SupervisorSummary.java | 17 ++
.../backtype/storm/generated/TopologyInfo.java | 17 ++
.../storm/generated/TopologyInitialStatus.java | 17 ++
.../storm/generated/TopologySummary.java | 17 ++
.../storm/grouping/CustomStreamGrouping.java | 17 ++
.../jvm/backtype/storm/hooks/BaseTaskHook.java | 17 ++
.../src/jvm/backtype/storm/hooks/ITaskHook.java | 17 ++
.../backtype/storm/hooks/info/BoltAckInfo.java | 17 ++
.../storm/hooks/info/BoltExecuteInfo.java | 17 ++
.../backtype/storm/hooks/info/BoltFailInfo.java | 17 ++
.../jvm/backtype/storm/hooks/info/EmitInfo.java | 17 ++
.../backtype/storm/hooks/info/SpoutAckInfo.java | 17 ++
.../storm/hooks/info/SpoutFailInfo.java | 17 ++
.../backtype/storm/messaging/IConnection.java | 17 ++
.../jvm/backtype/storm/messaging/IContext.java | 17 ++
.../backtype/storm/messaging/TaskMessage.java | 17 ++
.../storm/messaging/TransportFactory.java | 17 ++
.../backtype/storm/messaging/netty/Client.java | 17 ++
.../backtype/storm/messaging/netty/Context.java | 17 ++
.../storm/messaging/netty/ControlMessage.java | 17 ++
.../storm/messaging/netty/MessageBatch.java | 17 ++
.../storm/messaging/netty/MessageDecoder.java | 17 ++
.../storm/messaging/netty/MessageEncoder.java | 17 ++
.../backtype/storm/messaging/netty/Server.java | 17 ++
.../messaging/netty/StormClientHandler.java | 17 ++
.../netty/StormClientPipelineFactory.java | 17 ++
.../messaging/netty/StormServerHandler.java | 17 ++
.../netty/StormServerPipelineFactory.java | 17 ++
.../storm/metric/LoggingMetricsConsumer.java | 17 ++
.../storm/metric/MetricsConsumerBolt.java | 17 ++
.../jvm/backtype/storm/metric/SystemBolt.java | 17 ++
.../storm/metric/api/AssignableMetric.java | 17 ++
.../storm/metric/api/CombinedMetric.java | 17 ++
.../backtype/storm/metric/api/CountMetric.java | 17 ++
.../backtype/storm/metric/api/ICombiner.java | 17 ++
.../jvm/backtype/storm/metric/api/IMetric.java | 17 ++
.../storm/metric/api/IMetricsConsumer.java | 17 ++
.../jvm/backtype/storm/metric/api/IReducer.java | 17 ++
.../storm/metric/api/IStatefulObject.java | 17 ++
.../backtype/storm/metric/api/MeanReducer.java | 17 ++
.../storm/metric/api/MultiCountMetric.java | 17 ++
.../storm/metric/api/MultiReducedMetric.java | 17 ++
.../storm/metric/api/ReducedMetric.java | 17 ++
.../backtype/storm/metric/api/StateMetric.java | 17 ++
.../storm/nimbus/DefaultTopologyValidator.java | 17 ++
.../storm/nimbus/ITopologyValidator.java | 17 ++
.../backtype/storm/planner/CompoundSpout.java | 17 ++
.../backtype/storm/planner/CompoundTask.java | 17 ++
.../jvm/backtype/storm/planner/TaskBundle.java | 17 ++
.../jvm/backtype/storm/scheduler/Cluster.java | 17 ++
.../storm/scheduler/ExecutorDetails.java | 91 +++---
.../jvm/backtype/storm/scheduler/INimbus.java | 17 ++
.../backtype/storm/scheduler/IScheduler.java | 17 ++
.../backtype/storm/scheduler/ISupervisor.java | 17 ++
.../storm/scheduler/SchedulerAssignment.java | 97 ++++---
.../scheduler/SchedulerAssignmentImpl.java | 201 +++++++------
.../storm/scheduler/SupervisorDetails.java | 17 ++
.../backtype/storm/scheduler/Topologies.java | 97 ++++---
.../storm/scheduler/TopologyDetails.java | 17 ++
.../backtype/storm/scheduler/WorkerSlot.java | 17 ++
.../backtype/storm/security/auth/AuthUtils.java | 17 ++
.../storm/security/auth/IAuthorizer.java | 17 ++
.../storm/security/auth/ITransportPlugin.java | 17 ++
.../storm/security/auth/ReqContext.java | 17 ++
.../security/auth/SaslTransportPlugin.java | 17 ++
.../security/auth/SimpleTransportPlugin.java | 17 ++
.../storm/security/auth/ThriftClient.java | 17 ++
.../storm/security/auth/ThriftServer.java | 17 ++
.../auth/authorizer/DenyAuthorizer.java | 17 ++
.../auth/authorizer/NoopAuthorizer.java | 17 ++
.../auth/digest/ClientCallbackHandler.java | 17 ++
.../auth/digest/DigestSaslTransportPlugin.java | 17 ++
.../auth/digest/ServerCallbackHandler.java | 17 ++
.../serialization/BlowfishTupleSerializer.java | 17 ++
.../storm/serialization/DefaultKryoFactory.java | 17 ++
.../storm/serialization/IKryoDecorator.java | 17 ++
.../storm/serialization/IKryoFactory.java | 17 ++
.../storm/serialization/ITupleDeserializer.java | 17 ++
.../storm/serialization/ITupleSerializer.java | 17 ++
.../serialization/KryoTupleDeserializer.java | 17 ++
.../serialization/KryoTupleSerializer.java | 17 ++
.../serialization/KryoValuesDeserializer.java | 17 ++
.../serialization/KryoValuesSerializer.java | 17 ++
.../serialization/SerializableSerializer.java | 17 ++
.../serialization/SerializationFactory.java | 17 ++
.../types/ArrayListSerializer.java | 17 ++
.../serialization/types/HashMapSerializer.java | 17 ++
.../serialization/types/HashSetSerializer.java | 17 ++
.../types/ListDelegateSerializer.java | 17 ++
.../storm/spout/IMultiSchemableSpout.java | 17 ++
.../backtype/storm/spout/ISchemableSpout.java | 17 ++
.../src/jvm/backtype/storm/spout/ISpout.java | 17 ++
.../storm/spout/ISpoutOutputCollector.java | 17 ++
.../storm/spout/ISpoutWaitStrategy.java | 17 ++
.../jvm/backtype/storm/spout/MultiScheme.java | 17 ++
.../storm/spout/NothingEmptyEmitStrategy.java | 17 ++
.../backtype/storm/spout/RawMultiScheme.java | 17 ++
.../src/jvm/backtype/storm/spout/RawScheme.java | 17 ++
.../src/jvm/backtype/storm/spout/Scheme.java | 17 ++
.../storm/spout/SchemeAsMultiScheme.java | 17 ++
.../jvm/backtype/storm/spout/ShellSpout.java | 17 ++
.../storm/spout/SleepSpoutWaitStrategy.java | 17 ++
.../storm/spout/SpoutOutputCollector.java | 17 ++
.../jvm/backtype/storm/state/IStateSpout.java | 17 ++
.../storm/state/IStateSpoutOutputCollector.java | 17 ++
.../backtype/storm/state/ISubscribedState.java | 17 ++
.../state/ISynchronizeOutputCollector.java | 17 ++
.../storm/state/StateSpoutOutputCollector.java | 17 ++
.../storm/state/SynchronizeOutputCollector.java | 17 ++
.../storm/task/GeneralTopologyContext.java | 17 ++
.../src/jvm/backtype/storm/task/IBolt.java | 17 ++
.../jvm/backtype/storm/task/IErrorReporter.java | 17 ++
.../backtype/storm/task/IMetricsContext.java | 17 ++
.../backtype/storm/task/IOutputCollector.java | 17 ++
.../backtype/storm/task/OutputCollector.java | 17 ++
.../src/jvm/backtype/storm/task/ShellBolt.java | 17 ++
.../backtype/storm/task/TopologyContext.java | 17 ++
.../storm/task/WorkerTopologyContext.java | 17 ++
.../backtype/storm/testing/AckFailDelegate.java | 17 ++
.../storm/testing/AckFailMapTracker.java | 17 ++
.../jvm/backtype/storm/testing/AckTracker.java | 17 ++
.../backtype/storm/testing/BatchNumberList.java | 17 ++
.../storm/testing/BatchProcessWord.java | 17 ++
.../backtype/storm/testing/BatchRepeatA.java | 17 ++
.../jvm/backtype/storm/testing/BoltTracker.java | 17 ++
.../storm/testing/CompleteTopologyParam.java | 117 ++++----
.../storm/testing/CountingBatchBolt.java | 17 ++
.../storm/testing/CountingCommitBolt.java | 17 ++
.../jvm/backtype/storm/testing/FeederSpout.java | 17 ++
.../jvm/backtype/storm/testing/FixedTuple.java | 17 ++
.../backtype/storm/testing/FixedTupleSpout.java | 17 ++
.../backtype/storm/testing/IdentityBolt.java | 17 ++
.../storm/testing/KeyedCountingBatchBolt.java | 17 ++
.../testing/KeyedCountingCommitterBolt.java | 17 ++
.../storm/testing/KeyedSummingBatchBolt.java | 17 ++
.../storm/testing/MemoryTransactionalSpout.java | 17 ++
.../testing/MemoryTransactionalSpoutMeta.java | 17 ++
.../backtype/storm/testing/MkClusterParam.java | 97 ++++---
.../backtype/storm/testing/MkTupleParam.java | 85 +++---
.../backtype/storm/testing/MockedSources.java | 103 ++++---
.../jvm/backtype/storm/testing/NGrouping.java | 17 ++
.../storm/testing/NonRichBoltTracker.java | 17 ++
.../testing/OpaqueMemoryTransactionalSpout.java | 17 ++
.../storm/testing/PrepareBatchBolt.java | 17 ++
.../backtype/storm/testing/SpoutTracker.java | 17 ++
.../storm/testing/TestAggregatesCounter.java | 17 ++
.../backtype/storm/testing/TestConfBolt.java | 17 ++
.../backtype/storm/testing/TestGlobalCount.java | 17 ++
.../src/jvm/backtype/storm/testing/TestJob.java | 65 +++--
.../storm/testing/TestKryoDecorator.java | 17 ++
.../backtype/storm/testing/TestPlannerBolt.java | 17 ++
.../storm/testing/TestPlannerSpout.java | 17 ++
.../backtype/storm/testing/TestSerObject.java | 17 ++
.../backtype/storm/testing/TestWordCounter.java | 17 ++
.../backtype/storm/testing/TestWordSpout.java | 17 ++
.../backtype/storm/testing/TrackedTopology.java | 51 ++--
.../storm/testing/TupleCaptureBolt.java | 17 ++
.../topology/BaseConfigurationDeclarer.java | 17 ++
.../storm/topology/BasicBoltExecutor.java | 17 ++
.../storm/topology/BasicOutputCollector.java | 17 ++
.../backtype/storm/topology/BoltDeclarer.java | 17 ++
.../ComponentConfigurationDeclarer.java | 17 ++
.../storm/topology/FailedException.java | 17 ++
.../jvm/backtype/storm/topology/IBasicBolt.java | 17 ++
.../storm/topology/IBasicOutputCollector.java | 17 ++
.../jvm/backtype/storm/topology/IComponent.java | 17 ++
.../jvm/backtype/storm/topology/IRichBolt.java | 17 ++
.../jvm/backtype/storm/topology/IRichSpout.java | 17 ++
.../storm/topology/IRichStateSpout.java | 17 ++
.../backtype/storm/topology/InputDeclarer.java | 17 ++
.../storm/topology/OutputFieldsDeclarer.java | 17 ++
.../storm/topology/OutputFieldsGetter.java | 17 ++
.../storm/topology/ReportedFailedException.java | 17 ++
.../backtype/storm/topology/SpoutDeclarer.java | 17 ++
.../storm/topology/TopologyBuilder.java | 17 ++
.../storm/topology/base/BaseBasicBolt.java | 17 ++
.../storm/topology/base/BaseBatchBolt.java | 17 ++
.../storm/topology/base/BaseComponent.java | 17 ++
...BaseOpaquePartitionedTransactionalSpout.java | 17 ++
.../base/BasePartitionedTransactionalSpout.java | 17 ++
.../storm/topology/base/BaseRichBolt.java | 17 ++
.../storm/topology/base/BaseRichSpout.java | 17 ++
.../topology/base/BaseTransactionalBolt.java | 17 ++
.../topology/base/BaseTransactionalSpout.java | 17 ++
.../storm/transactional/ICommitter.java | 17 ++
.../ICommitterTransactionalSpout.java | 17 ++
.../transactional/ITransactionalSpout.java | 17 ++
.../storm/transactional/TransactionAttempt.java | 17 ++
.../TransactionalSpoutBatchExecutor.java | 17 ++
.../TransactionalSpoutCoordinator.java | 17 ++
.../TransactionalTopologyBuilder.java | 17 ++
.../IOpaquePartitionedTransactionalSpout.java | 17 ++
.../IPartitionedTransactionalSpout.java | 17 ++
...uePartitionedTransactionalSpoutExecutor.java | 17 ++
.../PartitionedTransactionalSpoutExecutor.java | 17 ++
.../state/RotatingTransactionalState.java | 17 ++
.../transactional/state/TransactionalState.java | 17 ++
.../src/jvm/backtype/storm/tuple/Fields.java | 17 ++
.../src/jvm/backtype/storm/tuple/MessageId.java | 17 ++
.../src/jvm/backtype/storm/tuple/Tuple.java | 17 ++
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 17 ++
.../src/jvm/backtype/storm/tuple/Values.java | 17 ++
.../storm/utils/BufferFileInputStream.java | 17 ++
.../backtype/storm/utils/CRC32OutputStream.java | 17 ++
.../backtype/storm/utils/ClojureTimerTask.java | 17 ++
.../src/jvm/backtype/storm/utils/Container.java | 17 ++
.../jvm/backtype/storm/utils/DRPCClient.java | 17 ++
.../backtype/storm/utils/DisruptorQueue.java | 17 ++
.../storm/utils/IndifferentAccessMap.java | 17 ++
.../backtype/storm/utils/InprocMessaging.java | 17 ++
.../storm/utils/KeyedRoundRobinQueue.java | 17 ++
.../jvm/backtype/storm/utils/ListDelegate.java | 17 ++
.../jvm/backtype/storm/utils/LocalState.java | 17 ++
.../jvm/backtype/storm/utils/MutableInt.java | 17 ++
.../jvm/backtype/storm/utils/MutableLong.java | 17 ++
.../jvm/backtype/storm/utils/MutableObject.java | 17 ++
.../jvm/backtype/storm/utils/NimbusClient.java | 17 ++
.../storm/utils/RegisteredGlobalState.java | 17 ++
.../jvm/backtype/storm/utils/RotatingMap.java | 17 ++
.../backtype/storm/utils/ServiceRegistry.java | 17 ++
.../jvm/backtype/storm/utils/ShellProcess.java | 17 ++
.../storm/utils/ThriftTopologyUtils.java | 17 ++
.../src/jvm/backtype/storm/utils/Time.java | 17 ++
.../jvm/backtype/storm/utils/TimeCacheMap.java | 17 ++
.../src/jvm/backtype/storm/utils/Utils.java | 17 ++
.../backtype/storm/utils/VersionedStore.java | 17 ++
.../storm/utils/WindowedTimeThrottler.java | 17 ++
.../jvm/backtype/storm/utils/WritableUtils.java | 17 ++
.../backtype/storm/utils/ZookeeperAuthInfo.java | 17 ++
storm-core/src/jvm/storm/trident/JoinType.java | 17 ++
storm-core/src/jvm/storm/trident/Stream.java | 17 ++
.../src/jvm/storm/trident/TridentState.java | 17 ++
.../src/jvm/storm/trident/TridentTopology.java | 17 ++
.../trident/drpc/ReturnResultsReducer.java | 17 ++
.../fluent/ChainedAggregatorDeclarer.java | 17 ++
.../fluent/ChainedFullAggregatorDeclarer.java | 17 ++
.../ChainedPartitionAggregatorDeclarer.java | 17 ++
.../trident/fluent/GlobalAggregationScheme.java | 17 ++
.../jvm/storm/trident/fluent/GroupedStream.java | 17 ++
.../trident/fluent/IAggregatableStream.java | 17 ++
.../fluent/IChainedAggregatorDeclarer.java | 17 ++
.../jvm/storm/trident/fluent/UniqueIdGen.java | 17 ++
.../jvm/storm/trident/graph/GraphGrouper.java | 17 ++
.../src/jvm/storm/trident/graph/Group.java | 17 ++
.../jvm/storm/trident/operation/Aggregator.java | 17 ++
.../jvm/storm/trident/operation/Assembly.java | 17 ++
.../storm/trident/operation/BaseAggregator.java | 17 ++
.../jvm/storm/trident/operation/BaseFilter.java | 17 ++
.../storm/trident/operation/BaseFunction.java | 17 ++
.../trident/operation/BaseMultiReducer.java | 17 ++
.../storm/trident/operation/BaseOperation.java | 17 ++
.../trident/operation/CombinerAggregator.java | 17 ++
.../storm/trident/operation/EachOperation.java | 17 ++
.../src/jvm/storm/trident/operation/Filter.java | 17 ++
.../jvm/storm/trident/operation/Function.java | 17 ++
.../trident/operation/GroupedMultiReducer.java | 17 ++
.../storm/trident/operation/MultiReducer.java | 17 ++
.../jvm/storm/trident/operation/Operation.java | 17 ++
.../trident/operation/ReducerAggregator.java | 17 ++
.../trident/operation/TridentCollector.java | 17 ++
.../operation/TridentMultiReducerContext.java | 17 ++
.../operation/TridentOperationContext.java | 17 ++
.../storm/trident/operation/builtin/Count.java | 17 ++
.../storm/trident/operation/builtin/Debug.java | 17 ++
.../storm/trident/operation/builtin/Equals.java | 17 ++
.../trident/operation/builtin/FilterNull.java | 17 ++
.../storm/trident/operation/builtin/FirstN.java | 17 ++
.../storm/trident/operation/builtin/MapGet.java | 17 ++
.../storm/trident/operation/builtin/Negate.java | 17 ++
.../trident/operation/builtin/SnapshotGet.java | 17 ++
.../storm/trident/operation/builtin/Sum.java | 17 ++
.../operation/builtin/TupleCollectionGet.java | 17 ++
.../operation/impl/CaptureCollector.java | 17 ++
.../operation/impl/ChainedAggregatorImpl.java | 17 ++
.../trident/operation/impl/ChainedResult.java | 17 ++
.../operation/impl/CombinerAggStateUpdater.java | 17 ++
.../impl/CombinerAggregatorCombineImpl.java | 17 ++
.../impl/CombinerAggregatorInitImpl.java | 17 ++
.../trident/operation/impl/FilterExecutor.java | 17 ++
.../operation/impl/GlobalBatchToPartition.java | 17 ++
.../trident/operation/impl/GroupCollector.java | 17 ++
.../operation/impl/GroupedAggregator.java | 17 ++
.../impl/GroupedMultiReducerExecutor.java | 17 ++
.../operation/impl/IdentityMultiReducer.java | 17 ++
.../impl/IndexHashBatchToPartition.java | 17 ++
.../operation/impl/JoinerMultiReducer.java | 17 ++
.../operation/impl/ReducerAggStateUpdater.java | 17 ++
.../operation/impl/ReducerAggregatorImpl.java | 17 ++
.../storm/trident/operation/impl/Result.java | 17 ++
.../operation/impl/SingleEmitAggregator.java | 17 ++
.../trident/operation/impl/TrueFilter.java | 17 ++
.../storm/trident/partition/GlobalGrouping.java | 17 ++
.../trident/partition/IdentityGrouping.java | 17 ++
.../trident/partition/IndexHashGrouping.java | 17 ++
.../storm/trident/planner/BridgeReceiver.java | 17 ++
.../src/jvm/storm/trident/planner/Node.java | 17 ++
.../storm/trident/planner/NodeStateInfo.java | 17 ++
.../storm/trident/planner/PartitionNode.java | 17 ++
.../storm/trident/planner/ProcessorContext.java | 17 ++
.../storm/trident/planner/ProcessorNode.java | 17 ++
.../jvm/storm/trident/planner/SpoutNode.java | 17 ++
.../storm/trident/planner/SubtopologyBolt.java | 17 ++
.../storm/trident/planner/TridentProcessor.java | 17 ++
.../storm/trident/planner/TupleReceiver.java | 17 ++
.../planner/processor/AggregateProcessor.java | 17 ++
.../planner/processor/AppendCollector.java | 17 ++
.../planner/processor/EachProcessor.java | 17 ++
.../planner/processor/FreshCollector.java | 17 ++
.../processor/MultiReducerProcessor.java | 17 ++
.../processor/PartitionPersistProcessor.java | 17 ++
.../planner/processor/ProjectedProcessor.java | 17 ++
.../planner/processor/StateQueryProcessor.java | 17 ++
.../planner/processor/TridentContext.java | 17 ++
.../storm/trident/spout/BatchSpoutExecutor.java | 17 ++
.../src/jvm/storm/trident/spout/IBatchID.java | 17 ++
.../jvm/storm/trident/spout/IBatchSpout.java | 17 ++
.../trident/spout/ICommitterTridentSpout.java | 17 ++
.../spout/IOpaquePartitionedTridentSpout.java | 17 ++
.../trident/spout/IPartitionedTridentSpout.java | 17 ++
.../storm/trident/spout/ISpoutPartition.java | 17 ++
.../jvm/storm/trident/spout/ITridentSpout.java | 17 ++
.../OpaquePartitionedTridentSpoutExecutor.java | 17 ++
.../spout/PartitionedTridentSpoutExecutor.java | 17 ++
.../trident/spout/RichSpoutBatchExecutor.java | 17 ++
.../storm/trident/spout/RichSpoutBatchId.java | 17 ++
.../spout/RichSpoutBatchIdSerializer.java | 17 ++
.../trident/spout/RichSpoutBatchTriggerer.java | 17 ++
.../trident/spout/TridentSpoutCoordinator.java | 17 ++
.../trident/spout/TridentSpoutExecutor.java | 17 ++
.../storm/trident/state/BaseQueryFunction.java | 17 ++
.../storm/trident/state/BaseStateUpdater.java | 17 ++
.../trident/state/CombinerValueUpdater.java | 17 ++
.../storm/trident/state/ITupleCollection.java | 17 ++
.../state/JSONNonTransactionalSerializer.java | 17 ++
.../trident/state/JSONOpaqueSerializer.java | 17 ++
.../state/JSONTransactionalSerializer.java | 17 ++
.../jvm/storm/trident/state/OpaqueValue.java | 17 ++
.../jvm/storm/trident/state/QueryFunction.java | 17 ++
.../jvm/storm/trident/state/ReadOnlyState.java | 17 ++
.../trident/state/ReducerValueUpdater.java | 17 ++
.../src/jvm/storm/trident/state/Serializer.java | 17 ++
.../src/jvm/storm/trident/state/State.java | 17 ++
.../jvm/storm/trident/state/StateFactory.java | 17 ++
.../src/jvm/storm/trident/state/StateSpec.java | 17 ++
.../src/jvm/storm/trident/state/StateType.java | 17 ++
.../jvm/storm/trident/state/StateUpdater.java | 17 ++
.../storm/trident/state/TransactionalValue.java | 17 ++
.../jvm/storm/trident/state/ValueUpdater.java | 17 ++
.../trident/state/map/CachedBatchReadsMap.java | 17 ++
.../jvm/storm/trident/state/map/CachedMap.java | 17 ++
.../storm/trident/state/map/IBackingMap.java | 17 ++
.../state/map/MapCombinerAggStateUpdater.java | 17 ++
.../state/map/MapReducerAggStateUpdater.java | 17 ++
.../jvm/storm/trident/state/map/MapState.java | 17 ++
.../state/map/MicroBatchIBackingMap.java | 17 ++
.../trident/state/map/NonTransactionalMap.java | 17 ++
.../jvm/storm/trident/state/map/OpaqueMap.java | 17 ++
.../trident/state/map/ReadOnlyMapState.java | 17 ++
.../trident/state/map/SnapshottableMap.java | 17 ++
.../trident/state/map/TransactionalMap.java | 17 ++
.../state/snapshot/ReadOnlySnapshottable.java | 17 ++
.../trident/state/snapshot/Snapshottable.java | 17 ++
.../trident/testing/CountAsAggregator.java | 17 ++
.../storm/trident/testing/FeederBatchSpout.java | 17 ++
.../testing/FeederCommitterBatchSpout.java | 17 ++
.../storm/trident/testing/FixedBatchSpout.java | 17 ++
.../src/jvm/storm/trident/testing/IFeeder.java | 17 ++
.../trident/testing/LRUMemoryMapState.java | 17 ++
.../storm/trident/testing/MemoryBackingMap.java | 17 ++
.../storm/trident/testing/MemoryMapState.java | 17 ++
.../storm/trident/testing/MockTridentTuple.java | 17 ++
.../src/jvm/storm/trident/testing/Split.java | 17 ++
.../jvm/storm/trident/testing/StringLength.java | 17 ++
.../jvm/storm/trident/testing/TrueFilter.java | 17 ++
.../jvm/storm/trident/testing/TuplifyArgs.java | 17 ++
.../jvm/storm/trident/topology/BatchInfo.java | 17 ++
.../trident/topology/ITridentBatchBolt.java | 17 ++
.../topology/MasterBatchCoordinator.java | 17 ++
.../trident/topology/TransactionAttempt.java | 17 ++
.../trident/topology/TridentBoltExecutor.java | 17 ++
.../topology/TridentTopologyBuilder.java | 17 ++
.../state/RotatingTransactionalState.java | 17 ++
.../topology/state/TransactionalState.java | 17 ++
.../src/jvm/storm/trident/tuple/ComboList.java | 17 ++
.../src/jvm/storm/trident/tuple/ConsList.java | 17 ++
.../jvm/storm/trident/tuple/TridentTuple.java | 17 ++
.../storm/trident/tuple/TridentTupleView.java | 17 ++
.../jvm/storm/trident/tuple/ValuePointer.java | 17 ++
.../storm/trident/util/ErrorEdgeFactory.java | 17 ++
.../src/jvm/storm/trident/util/IndexedEdge.java | 17 ++
.../src/jvm/storm/trident/util/LRUMap.java | 17 ++
.../jvm/storm/trident/util/TridentUtils.java | 17 ++
storm-core/src/multilang/py/storm.py | 18 ++
storm-core/src/multilang/rb/storm.rb | 18 ++
storm-core/src/storm.thrift | 23 ++
storm-core/src/ui/public/css/style.css | 17 ++
storm-core/src/ui/public/js/script.js | 17 ++
.../test/clj/backtype/storm/clojure_test.clj | 15 +
.../test/clj/backtype/storm/cluster_test.clj | 15 +
.../test/clj/backtype/storm/config_test.clj | 15 +
.../test/clj/backtype/storm/drpc_test.clj | 15 +
.../test/clj/backtype/storm/fields_test.clj | 15 +
.../test/clj/backtype/storm/grouping_test.clj | 15 +
.../clj/backtype/storm/integration_test.clj | 15 +
.../clj/backtype/storm/local_state_test.clj | 15 +
.../storm/messaging/netty_integration_test.clj | 15 +
.../storm/messaging/netty_unit_test.clj | 15 +
.../test/clj/backtype/storm/messaging_test.clj | 15 +
.../test/clj/backtype/storm/metrics_test.clj | 15 +
.../test/clj/backtype/storm/multilang_test.clj | 15 +
.../test/clj/backtype/storm/nimbus_test.clj | 15 +
.../test/clj/backtype/storm/scheduler_test.clj | 15 +
.../storm/security/auth/AuthUtils_test.clj | 15 +
.../storm/security/auth/ReqContext_test.clj | 15 +
.../security/auth/SaslTransportPlugin_test.clj | 15 +
.../storm/security/auth/ThriftClient_test.clj | 15 +
.../storm/security/auth/ThriftServer_test.clj | 15 +
.../backtype/storm/security/auth/auth_test.clj | 15 +
.../BlowfishTupleSerializer_test.clj | 15 +
.../serialization/SerializationFactory_test.clj | 15 +
.../clj/backtype/storm/serialization_test.clj | 15 +
.../clj/backtype/storm/subtopology_test.clj | 15 +
.../test/clj/backtype/storm/supervisor_test.clj | 15 +
.../test/clj/backtype/storm/testing4j_test.clj | 15 +
.../test/clj/backtype/storm/tick_tuple_test.clj | 15 +
.../clj/backtype/storm/transactional_test.clj | 15 +
.../test/clj/backtype/storm/tuple_test.clj | 15 +
.../test/clj/backtype/storm/utils_test.clj | 15 +
.../clj/backtype/storm/versioned_store_test.clj | 15 +
.../test/clj/storm/trident/integration_test.clj | 15 +
.../test/clj/storm/trident/state_test.clj | 15 +
.../test/clj/storm/trident/tuple_test.clj | 15 +
storm-lib/project.clj | 15 +
558 files changed, 9974 insertions(+), 695 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/project.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index c2b391a,0000000..d765e71
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@@ -1,204 -1,0 +1,221 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class Client implements IConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ private final int max_retries;
+ private final int base_sleep_ms;
+ private final int max_sleep_ms;
+ private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
+ private AtomicReference<Channel> channelRef;
+ private final ClientBootstrap bootstrap;
+ private InetSocketAddress remote_addr;
+ private AtomicInteger retries;
+ private final Random random = new Random();
+ private final ChannelFactory factory;
+ private final int buffer_size;
+ private final AtomicBoolean being_closed;
+
+ @SuppressWarnings("rawtypes")
+ Client(Map storm_conf, String host, int port) {
+ message_queue = new LinkedBlockingQueue<Object>();
+ retries = new AtomicInteger(0);
+ channelRef = new AtomicReference<Channel>(null);
+ being_closed = new AtomicBoolean(false);
+
+ // Configure
+ buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+ base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+ max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+
+ if (maxWorkers > 0) {
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("sendBufferSize", buffer_size);
+ bootstrap.setOption("keepAlive", true);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+
+ // Start the connection attempt.
+ remote_addr = new InetSocketAddress(host, port);
+ bootstrap.connect(remote_addr);
+ }
+
+ /**
+ * We will retry connection with exponential back-off policy
+ */
+ void reconnect() {
+ try {
+ int tried_count = retries.incrementAndGet();
+ if (tried_count <= max_retries) {
+ Thread.sleep(getSleepTimeMs());
+ LOG.info("Reconnect ... [{}]", tried_count);
+ bootstrap.connect(remote_addr);
+ LOG.debug("connection started...");
+ } else {
+ LOG.warn("Remote address is not reachable. We will close this client.");
+ close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("connection failed", e);
+ }
+ }
+
+ /**
+ * # of milliseconds to wait per exponential back-off policy
+ */
+ private int getSleepTimeMs()
+ {
+ int backoff = 1 << retries.get();
+ int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
+ if ( sleepMs > max_sleep_ms )
+ sleepMs = max_sleep_ms;
+ return sleepMs;
+ }
+
+ /**
+ * Enqueue a task message to be sent to server
+ */
+ public void send(int task, byte[] message) {
+ //throw exception if the client is being closed
+ if (being_closed.get()) {
+ throw new RuntimeException("Client is being closed, and does not take requests any more");
+ }
+
+ try {
+ message_queue.put(new TaskMessage(task, message));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Take all enqueued messages from queue
+ * @return
+ * @throws InterruptedException
+ */
+ MessageBatch takeMessages() throws InterruptedException {
+ //1st message
+ MessageBatch batch = new MessageBatch(buffer_size);
+ Object msg = message_queue.take();
+ batch.add(msg);
+
+ //we will discard any message after CLOSE
+ if (msg==ControlMessage.CLOSE_MESSAGE)
+ return batch;
+
+ while (!batch.isFull()) {
+ //peek the next message
+ msg = message_queue.peek();
+ //no more messages
+ if (msg == null) break;
+
+ //we will discard any message after CLOSE
+ if (msg==ControlMessage.CLOSE_MESSAGE) {
+ message_queue.take();
+ batch.add(msg);
+ break;
+ }
+
+ //try to add this msg into batch
+ if (!batch.tryAdd((TaskMessage) msg))
+ break;
+
+ //remove this message
+ message_queue.take();
+ }
+
+ return batch;
+ }
+
+ /**
+ * gracefully close this client.
+ *
+ * We will send all existing requests, and then invoke close_n_release() method
+ */
+ public synchronized void close() {
+ if (!being_closed.get()) {
+ //enqueue a CLOSE message so that shutdown() will be invoked
+ try {
+ message_queue.put(ControlMessage.CLOSE_MESSAGE);
+ being_closed.set(true);
+ } catch (InterruptedException e) {
+ close_n_release();
+ }
+ }
+ }
+
+ /**
+ * close_n_release() is invoked after all messages have been sent.
+ */
+ void close_n_release() {
+ if (channelRef.get() != null)
+ channelRef.get().close().awaitUninterruptibly();
+
+ //we need to release resources
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ factory.releaseExternalResources();
+ }}).start();
+ }
+
+ public TaskMessage recv(int flags) {
+ throw new RuntimeException("Client connection should not receive any messages");
+ }
+
+ void setChannel(Channel channel) {
+ channelRef.set(channel);
+ //reset retries
+ if (channel != null)
+ retries.set(0);
+ }
+
+}
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 018e0f9,0000000..3e09dd1
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IContext;
+
+import java.util.Map;
+import java.util.Vector;
+
+public class Context implements IContext {
+ @SuppressWarnings("rawtypes")
+ private Map storm_conf;
+ private volatile Vector<IConnection> connections;
+
+ /**
+ * initialization per Storm configuration
+ */
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map storm_conf) {
+ this.storm_conf = storm_conf;
+ connections = new Vector<IConnection>();
+ }
+
+ /**
+ * establish a server with a binding port
+ */
+ public IConnection bind(String storm_id, int port) {
+ IConnection server = new Server(storm_conf, port);
+ connections.add(server);
+ return server;
+ }
+
+ /**
+ * establish a connection to a remote server
+ */
+ public IConnection connect(String storm_id, String host, int port) {
+ IConnection client = new Client(storm_conf, host, port);
+ connections.add(client);
+ return client;
+ }
+
+ /**
+ * terminate this context
+ */
+ public void term() {
+ for (IConnection conn : connections) {
+ conn.close();
+ }
+ connections = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index 4cc2040,0000000..a552cf7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@@ -1,50 -1,0 +1,67 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+enum ControlMessage {
+ CLOSE_MESSAGE((short)-100),
+ EOB_MESSAGE((short)-201),
+ OK_RESPONSE((short)-200),
+ FAILURE_RESPONSE((short)-400);
+
+ private short code;
+
+ //private constructor
+ private ControlMessage(short code) {
+ this.code = code;
+ }
+
+ /**
+ * Return a control message per an encoded status code
+ * @param encoded
+ * @return
+ */
+ static ControlMessage mkMessage(short encoded) {
+ for(ControlMessage cm: ControlMessage.values()) {
+ if(encoded == cm.code) return cm;
+ }
+ return null;
+ }
+
+ int encodeLength() {
+ return 2; //short
+ }
+
+ /**
+ * encode the current Control Message into a channel buffer
+ * @throws Exception
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
+ write(bout);
+ bout.close();
+ return bout.buffer();
+ }
+
+ void write(ChannelBufferOutputStream bout) throws Exception {
+ bout.writeShort(code);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index a9d46a2,0000000..9d287e4
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@@ -1,151 -1,0 +1,168 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+ private int buffer_size;
+ private ArrayList<Object> msgs;
+ private int encoded_length;
+
+ MessageBatch(int buffer_size) {
+ this.buffer_size = buffer_size;
+ msgs = new ArrayList<Object>();
+ encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+ }
+
+ void add(Object obj) {
+ if (obj == null)
+ throw new RuntimeException("null object forbidded in message batch");
+
+ if (obj instanceof TaskMessage) {
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.add(msg);
+ encoded_length += msgEncodeLength(msg);
+ return;
+ }
+
+ if (obj instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage)obj;
+ msgs.add(msg);
+ encoded_length += msg.encodeLength();
+ return;
+ }
+
+ throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
+ }
+
+ void remove(Object obj) {
+ if (obj == null) return;
+
+ if (obj instanceof TaskMessage) {
+ TaskMessage msg = (TaskMessage)obj;
+ msgs.remove(msg);
+ encoded_length -= msgEncodeLength(msg);
+ return;
+ }
+
+ if (obj instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage)obj;
+ msgs.remove(msg);
+ encoded_length -= msg.encodeLength();
+ return;
+ }
+ }
+
+ Object get(int index) {
+ return msgs.get(index);
+ }
+
+ /**
+ * try to add a TaskMessage to a batch
+ * @param taskMsg
+ * @return false if the msg could not be added due to buffer size limit; true otherwise
+ */
+ boolean tryAdd(TaskMessage taskMsg) {
+ if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
+ return false;
+ add(taskMsg);
+ return true;
+ }
+
+ private int msgEncodeLength(TaskMessage taskMsg) {
+ if (taskMsg == null) return 0;
+
+ int size = 6; //INT + SHORT
+ if (taskMsg.message() != null)
+ size += taskMsg.message().length;
+ return size;
+ }
+
+ /**
+ * Has this batch used up allowed buffer size
+ * @return
+ */
+ boolean isFull() {
+ return encoded_length >= buffer_size;
+ }
+
+ /**
+ * true if this batch doesn't have any messages
+ * @return
+ */
+ boolean isEmpty() {
+ return msgs.isEmpty();
+ }
+
+ /**
+ * # of msgs in this batch
+ * @return
+ */
+ int size() {
+ return msgs.size();
+ }
+
+ /**
+ * create a buffer containing the encoding of this batch
+ */
+ ChannelBuffer buffer() throws Exception {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+
+ for (Object msg : msgs)
+ if (msg instanceof TaskMessage)
+ writeTaskMessage(bout, (TaskMessage)msg);
+ else
+ ((ControlMessage)msg).write(bout);
+
+ //add a END_OF_BATCH indicator
+ ControlMessage.EOB_MESSAGE.write(bout);
+
+ bout.close();
+
+ return bout.buffer();
+ }
+
+ /**
+ * write a TaskMessage into a stream
+ *
+ * Each TaskMessage is encoded as:
+ * task ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+ int payload_len = 0;
+ if (message.message() != null)
+ payload_len = message.message().length;
+
+ int task_id = message.task();
+ if (task_id > Short.MAX_VALUE)
+ throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+
+ bout.writeShort((short)task_id);
+ bout.writeInt(payload_len);
+ if (payload_len >0)
+ bout.write(message.message());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 76776a9,0000000..3365e58
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@@ -1,68 -1,0 +1,85 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {
+ /*
+ * Each ControlMessage is encoded as:
+ * code (<0) ... short(2)
+ * Each TaskMessage is encoded as:
+ * task (>=0) ... short(2)
+ * len ... int(4)
+ * payload ... byte[] *
+ */
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+ // Make sure that we have received at least a short
+ if (buf.readableBytes() < 2) {
+ //need more data
+ return null;
+ }
+
+ // Mark the current buffer position before reading task/len field
+ // because the whole frame might not be in the buffer yet.
+ // We will reset the buffer position to the marked position if
+ // there's not enough bytes in the buffer.
+ buf.markReaderIndex();
+
+ //read the short field
+ short code = buf.readShort();
+
+ //case 1: Control message
+ ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+ if (ctrl_msg != null) return ctrl_msg;
+
+ //case 2: task Message
+ short task = code;
+
+ // Make sure that we have received at least an integer (length)
+ if (buf.readableBytes() < 4) {
+ //need more data
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // Read the length field.
+ int length = buf.readInt();
+ if (length<=0) {
+ return new TaskMessage(task, null);
+ }
+
+ // Make sure if there's enough bytes in the buffer.
+ if (buf.readableBytes() < length) {
+ // The whole bytes were not received yet - return null.
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ // There's enough bytes in the buffer. Read it.
+ ChannelBuffer payload = buf.readBytes(length);
+
+ // Successfully decoded a frame.
+ // Return a TaskMessage object
+ return new TaskMessage(task,payload.array());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
index c0ac8f1,0000000..e6e65c3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@@ -1,22 -1,0 +1,39 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+ if (obj instanceof ControlMessage) {
+ return ((ControlMessage)obj).buffer();
+ }
+
+ if (obj instanceof MessageBatch) {
+ return ((MessageBatch)obj).buffer();
+ }
+
+ throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index bf6825c,0000000..ad811b0
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@@ -1,119 -1,0 +1,136 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class Server implements IConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(Server.class);
+ @SuppressWarnings("rawtypes")
+ Map storm_conf;
+ int port;
+ private LinkedBlockingQueue<TaskMessage> message_queue;
+ volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
+ final ChannelFactory factory;
+ final ServerBootstrap bootstrap;
+
+ @SuppressWarnings("rawtypes")
+ Server(Map storm_conf, int port) {
+ this.storm_conf = storm_conf;
+ this.port = port;
+ message_queue = new LinkedBlockingQueue<TaskMessage>();
+
+ // Configure the server.
+ int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+
+ if (maxWorkers > 0) {
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+ } else {
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+ bootstrap = new ServerBootstrap(factory);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.receiveBufferSize", buffer_size);
+ bootstrap.setOption("child.keepAlive", true);
+
+ // Set up the pipeline factory.
+ bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+
+ // Bind and start to accept incoming connections.
+ Channel channel = bootstrap.bind(new InetSocketAddress(port));
+ allChannels.add(channel);
+ }
+
+ /**
+ * enqueue a received message
+ * @param message
+ * @throws InterruptedException
+ */
+ protected void enqueue(TaskMessage message) throws InterruptedException {
+ message_queue.put(message);
+ LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
+ }
+
+ /**
+ * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
+ */
+ public TaskMessage recv(int flags) {
+ if ((flags & 0x01) == 0x01) {
+ //non-blocking
+ return message_queue.poll();
+ } else {
+ try {
+ TaskMessage request = message_queue.take();
+ LOG.debug("request to be processed: {}", request);
+ return request;
+ } catch (InterruptedException e) {
+ LOG.info("exception within msg receiving", e);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * register a newly created channel
+ * @param channel
+ */
+ protected void addChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ /**
+ * close a channel
+ * @param channel
+ */
+ protected void closeChannel(Channel channel) {
+ channel.close().awaitUninterruptibly();
+ allChannels.remove(channel);
+ }
+
+ /**
+ * close all channels, and release resources
+ */
+ public synchronized void close() {
+ if (allChannels != null) {
+ allChannels.close().awaitUninterruptibly();
+ factory.releaseExternalResources();
+ allChannels = null;
+ }
+ }
+
+ public void send(int task, byte[] message) {
+ throw new RuntimeException("Server connection should not send any messages");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 6fbfb1c,0000000..65c36a7
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@@ -1,104 -1,0 +1,121 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+ private Client client;
+ private AtomicBoolean being_closed;
+ long start_time;
+
+ StormClientHandler(Client client) {
+ this.client = client;
+ being_closed = new AtomicBoolean(false);
+ start_time = System.currentTimeMillis();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
+ //register the newly established channel
+ Channel channel = event.getChannel();
+ client.setChannel(channel);
+ LOG.debug("connection established to a remote host");
+
+ //send next request
+ try {
+ sendRequests(channel, client.takeMessages());
+ } catch (InterruptedException e) {
+ channel.close();
+ }
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+ LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
+
+ //examine the response message from server
+ ControlMessage msg = (ControlMessage)event.getMessage();
+ if (msg==ControlMessage.FAILURE_RESPONSE)
+ LOG.info("failure response:{}", msg);
+
+ //send next request
+ Channel channel = event.getChannel();
+ try {
+ sendRequests(channel, client.takeMessages());
+ } catch (InterruptedException e) {
+ channel.close();
+ }
+ }
+
+ /**
+ * Retrieve a request from message queue, and send to server
+ * @param channel
+ */
+ private void sendRequests(Channel channel, final MessageBatch requests) {
+ if (requests==null || requests.size()==0 || being_closed.get()) return;
+
+ //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
+ Object last_msg = requests.get(requests.size()-1);
+ if (last_msg==ControlMessage.CLOSE_MESSAGE) {
+ being_closed.set(true);
+ requests.remove(last_msg);
+ }
+
+ //we may don't need do anything if no requests found
+ if (requests.isEmpty()) {
+ if (being_closed.get())
+ client.close_n_release();
+ return;
+ }
+
+ //write request into socket channel
+ ChannelFuture future = channel.write(requests);
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future)
+ throws Exception {
+ if (!future.isSuccess()) {
+ LOG.info("failed to send requests:", future.getCause());
+ future.getChannel().close();
+ } else {
+ LOG.debug("{} request(s) sent", requests.size());
+ }
+ if (being_closed.get())
+ client.close_n_release();
+ }
+ });
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+ Throwable cause = event.getCause();
+ if (!(cause instanceof ConnectException)) {
+ LOG.info("Connection failed:", cause);
+ }
+ if (!being_closed.get()) {
+ client.setChannel(null);
+ client.reconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 91c513a,0000000..6bad8e3
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@@ -1,27 -1,0 +1,44 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+class StormClientPipelineFactory implements ChannelPipelineFactory {
+ private Client client;
+
+ StormClientPipelineFactory(Client client) {
+ this.client = client;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormClientHandler(client));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 9a5aaed,0000000..093fb61
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@@ -1,53 -1,0 +1,70 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StormServerHandler extends SimpleChannelUpstreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
+ Server server;
+ private AtomicInteger failure_count;
+
+ StormServerHandler(Server server) {
+ this.server = server;
+ failure_count = new AtomicInteger(0);
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ server.addChannel(e.getChannel());
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ Object msg = e.getMessage();
+ if (msg == null) return;
+
+ //end of batch?
+ if (msg==ControlMessage.EOB_MESSAGE) {
+ Channel channel = ctx.getChannel();
+ LOG.debug("Send back response ...");
+ if (failure_count.get()==0)
+ channel.write(ControlMessage.OK_RESPONSE);
+ else channel.write(ControlMessage.FAILURE_RESPONSE);
+ return;
+ }
+
+ //enqueue the received message for processing
+ try {
+ server.enqueue((TaskMessage)msg);
+ } catch (InterruptedException e1) {
+ LOG.info("failed to enqueue a request message", e);
+ failure_count.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ server.closeChannel(e.getChannel());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
index 56b0834,0000000..df29ba8
mode 100644,000000..100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@@ -1,28 -1,0 +1,45 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+
+class StormServerPipelineFactory implements ChannelPipelineFactory {
+ private Server server;
+
+ StormServerPipelineFactory(Server server) {
+ this.server = server;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ // Decoder
+ pipeline.addLast("decoder", new MessageDecoder());
+ // Encoder
+ pipeline.addLast("encoder", new MessageEncoder());
+ // business logic.
+ pipeline.addLast("handler", new StormServerHandler(server));
+
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
index eefcb48,0000000..0c908c5
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@@ -1,44 -1,0 +1,59 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements. See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership. The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License. You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
+(ns backtype.storm.messaging.netty-integration-test
+ (:use [clojure test])
+ (:import [backtype.storm.messaging TransportFactory])
+ (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
+ (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(deftest test-integration
+ (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
+ :daemon-conf {STORM-LOCAL-MODE-ZMQ true
+ STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ }]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
+ {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
+ :parallelism-hint 6)})
+ results (complete-topology cluster
+ topology
+ ;; important for test that
+ ;; #tuples = multiple of 4 and 6
+ :storm-conf {TOPOLOGY-WORKERS 3}
+ :mock-sources {"1" [["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ]}
+ )]
+ (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+ (read-tuples results "2"))))))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index 12ebe5d,0000000..20914ef
mode 100644,000000..100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@@ -1,97 -1,0 +1,112 @@@
++;; Licensed to the Apache Software Foundation (ASF) under one
++;; or more contributor license agreements. See the NOTICE file
++;; distributed with this work for additional information
++;; regarding copyright ownership. The ASF licenses this file
++;; to you under the Apache License, Version 2.0 (the
++;; "License"); you may not use this file except in compliance
++;; with the License. You may obtain a copy of the License at
++;;
++;; http:;; www.apache.org/licenses/LICENSE-2.0
++;;
++;; Unless required by applicable law or agreed to in writing, software
++;; distributed under the License is distributed on an "AS IS" BASIS,
++;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++;; See the License for the specific language governing permissions and
++;; limitations under the License.
+(ns backtype.storm.messaging.netty-unit-test
+ (:use [clojure test])
+ (:import [backtype.storm.messaging TransportFactory])
+ (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(def port 6700)
+(def task 1)
+
+(deftest test-basic
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-large-msg
+ (let [req_msg (apply str (repeat 2048000 'c'))
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-server-delayed
+ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+ storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ client (.connect context nil "localhost" port)
+ _ (.send client task (.getBytes req_msg))
+ _ (Thread/sleep 1000)
+ server (.bind context nil port)
+ resp (.recv server 0)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close client)
+ (.close server)
+ (.term context)))
+
+(deftest test-batch
+ (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+ STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+ STORM-MESSAGING-NETTY-MAX-RETRIES 10
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+ STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+ STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+ }
+ context (TransportFactory/makeContext storm-conf)
+ server (.bind context nil port)
+ client (.connect context nil "localhost" port)]
+ (doseq [num (range 1 100000)]
+ (let [req_msg (str num)]
+ (.send client task (.getBytes req_msg))))
+ (doseq [num (range 1 100000)]
+ (let [req_msg (str num)
+ resp (.recv server 0)
+ resp_msg (String. (.message resp))]
+ (is (= req_msg resp_msg))))
+ (.close client)
+ (.close server)
+ (.term context)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/962d5207/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------