You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:04 UTC
[08/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
deleted file mode 100644
index 1960371..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-import clojure.lang.APersistentMap;
-import clojure.lang.IFn;
-import java.util.List;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * ClusterState provides the API for the pluggable state store used by the
- * Storm daemons. Data is stored in path/value format, and the store supports
- * listing sub-paths at a given path.
- * All data should be available across all nodes with eventual consistency.
- *
- * IMPORTANT NOTE:
- * Heartbeats have different api calls used to interact with them. The root
- * path (/) may or may not be the same as the root path for the other api calls.
- *
- * For example, performing these two calls:
- * set_data("/path", data, acls);
- * void set_worker_hb("/path", heartbeat, acls);
- * may or may not cause a collision in "/path".
- * Never use the same paths with the *_hb* methods as you do with the others.
- */
-public interface ClusterState {
-
- /**
- * Registers a callback function that gets called when CuratorEvents happen.
- * @param callback is a clojure IFn that accepts the type - translated to
- * clojure keyword as in zookeeper.clj - and the path: (callback type path)
- * @return is an id that can be passed to unregister(...) to unregister the
- * callback.
- */
- String register(IFn callback);
-
- /**
- * Unregisters a callback function that was registered with register(...).
- * @param id is the String id that was returned from register(...).
- */
- void unregister(String id);
-
- /**
- * Path will be appended with a monotonically increasing integer, a new node
- * will be created there, and data will be put at that node.
- * @param path The path that the monotonically increasing integer suffix will
- * be added to.
- * @param data The data that will be written at the suffixed path's node.
- * @param acls The acls to apply to the path. May be null.
- * @return The path with the integer suffix appended.
- */
- String create_sequential(String path, byte[] data, List<ACL> acls);
-
- /**
- * Creates nodes for path and all its parents. Path elements are separated by
- * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
- * @param path The path to create, along with all its parents.
- * @param acls The acls to apply to the path. May be null.
- * @return path
- */
- String mkdirs(String path, List<ACL> acls);
-
- /**
- * Deletes the node at a given path, and any child nodes that may exist.
- * @param path The path to delete
- */
- void delete_node(String path);
-
- /**
- * Creates an ephemeral node at path. Ephemeral nodes are destroyed
- * by the store when the client disconnects.
- * @param path The path where a node will be created.
- * @param data The data to be written at the node.
- * @param acls The acls to apply to the path. May be null.
- */
- void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
-
- /**
- * Gets the 'version' of the node at a path. Optionally sets a watch
- * on that node. The version should increase whenever a write happens.
- * @param path The path to get the version of.
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return The integer version of this node.
- */
- Integer get_version(String path, boolean watch);
-
- /**
- * Check if a node exists and optionally set a watch on the path.
- * @param path The path to check for the existence of a node.
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return Whether or not a node exists at path.
- */
- boolean node_exists(String path, boolean watch);
-
- /**
- * Get a list of paths of all the child nodes which exist immediately
- * under path.
- * @param path The path to look under
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return list of string paths under path.
- */
- List<String> get_children(String path, boolean watch);
-
- /**
- * Close the connection to the data store.
- */
- void close();
-
- /**
- * Set the value of the node at path to data.
- * @param path The path whose node we want to set.
- * @param data The data to put in the node.
- * @param acls The acls to apply to the path. May be null.
- */
- void set_data(String path, byte[] data, List<ACL> acls);
-
- /**
- * Get the data from the node at path
- * @param path The path to look under
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return The data at the node.
- */
- byte[] get_data(String path, boolean watch);
-
- /**
- * Get the data at the node along with its version. Data is returned
- * in an APersistentMap with clojure keyword keys :data and :version.
- * @param path The path to look under
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return An APersistentMap in the form {:data data :version version}
- */
- APersistentMap get_data_with_version(String path, boolean watch);
-
- /**
- * Write a worker heartbeat at the path.
- * @param path The path whose node we want to set.
- * @param data The data to put in the node.
- * @param acls The acls to apply to the path. May be null.
- */
- void set_worker_hb(String path, byte[] data, List<ACL> acls);
-
- /**
- * Get the heartbeat from the node at path
- * @param path The path to look under
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return The heartbeat at the node.
- */
- byte[] get_worker_hb(String path, boolean watch);
-
- /**
- * Get a list of paths of all the child nodes which exist immediately
- * under path. This is similar to get_children, but must be used for
- * any nodes
- * @param path The path to look under
- * @param watch Whether or not to set a watch on the path. Watched paths
- * emit events which are consumed by functions registered with the
- * register method. Very useful for catching updates to nodes.
- * @return list of string paths under path.
- */
- List<String> get_worker_hb_children(String path, boolean watch);
-
- /**
- * Deletes the heartbeat at a given path, and any child nodes that may exist.
- * @param path The path to delete.
- */
- void delete_worker_hb(String path);
-
- /**
- * Add a ClusterStateListener to the connection.
- * @param listener A ClusterStateListener to handle changing cluster state
- * events.
- */
- void add_listener(ClusterStateListener listener);
-
- /**
- * Force consistency on a path. Any writes committed on the path before
- * this call will be completely propagated when it returns.
- * @param path The path to synchronize.
- */
- void sync_path(String path);
-
- /**
- * Allows us to delete the znodes within /storm/blobstore/key_name
- * whose znodes start with the corresponding nimbusHostPortInfo
- * @param path /storm/blobstore/key_name
- * @param nimbusHostPortInfo Contains the host port information of
- * a nimbus node.
- */
- void delete_node_blobstore(String path, String nimbusHostPortInfo);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java
deleted file mode 100644
index 5ccde23..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.cluster;
-
-/**
- * This class is intended to provide runtime-context to ClusterStateFactory
- * implementors, giving information such as what daemon is creating it.
- */
-public class ClusterStateContext {
-
- private DaemonType daemonType;
-
- public ClusterStateContext() {
- daemonType = DaemonType.UNKNOWN;
- }
-
- public ClusterStateContext(DaemonType daemonType) {
- this.daemonType = daemonType;
- }
-
- public DaemonType getDaemonType() {
- return daemonType;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java
deleted file mode 100644
index 1f946ee..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-import clojure.lang.APersistentMap;
-import java.util.List;
-import org.apache.zookeeper.data.ACL;
-
-public interface ClusterStateFactory {
-
- ClusterState mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java
deleted file mode 100644
index 22693f8..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-public interface ClusterStateListener {
- void stateChanged(ConnectionState newState);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java b/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java
deleted file mode 100644
index d6887da..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-public enum ConnectionState {
- CONNECTED,
- RECONNECTED,
- LOST
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java b/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java
deleted file mode 100644
index 684d0ef..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.cluster;
-
-public enum DaemonType {
- SUPERVISOR,
- NIMBUS,
- WORKER,
- PACEMAKER,
- UNKNOWN
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
deleted file mode 100644
index 55590d0..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
- public static final Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
-
- byte[] _boltSer;
- Map<Object, IBatchBolt> _openTransactions;
- Map _conf;
- TopologyContext _context;
- BatchOutputCollectorImpl _collector;
-
- public BatchBoltExecutor(IBatchBolt bolt) {
- _boltSer = Utils.javaSerialize(bolt);
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _conf = conf;
- _context = context;
- _collector = new BatchOutputCollectorImpl(collector);
- _openTransactions = new HashMap<>();
- }
-
- @Override
- public void execute(Tuple input) {
- Object id = input.getValue(0);
- IBatchBolt bolt = getBatchBolt(id);
- try {
- bolt.execute(input);
- _collector.ack(input);
- } catch(FailedException e) {
- LOG.error("Failed to process tuple in batch", e);
- _collector.fail(input);
- }
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public void finishedId(Object id) {
- IBatchBolt bolt = getBatchBolt(id);
- _openTransactions.remove(id);
- bolt.finishBatch();
- }
-
- @Override
- public void timeoutId(Object attempt) {
- _openTransactions.remove(attempt);
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- newTransactionalBolt().declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return newTransactionalBolt().getComponentConfiguration();
- }
-
- private IBatchBolt getBatchBolt(Object id) {
- IBatchBolt bolt = _openTransactions.get(id);
- if(bolt==null) {
- bolt = newTransactionalBolt();
- bolt.prepare(_conf, _context, _collector, id);
- _openTransactions.put(id, bolt);
- }
- return bolt;
- }
-
- private IBatchBolt newTransactionalBolt() {
- return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java b/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java
deleted file mode 100644
index f5f3457..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-public abstract class BatchOutputCollector {
-
- /**
- * Emits a tuple to the default output stream.
- */
- public List<Integer> emit(List<Object> tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- public abstract List<Integer> emit(String streamId, List<Object> tuple);
-
- /**
- * Emits a tuple to the specified task on the default output stream. This output
- * stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message.
- */
- public void emitDirect(int taskId, List<Object> tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
-
- public abstract void reportError(Throwable error);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java b/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java
deleted file mode 100644
index cae7560..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import java.util.List;
-
-public class BatchOutputCollectorImpl extends BatchOutputCollector {
- OutputCollector _collector;
-
- public BatchOutputCollectorImpl(OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple) {
- return _collector.emit(streamId, tuple);
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- _collector.emitDirect(taskId, streamId, tuple);
- }
-
- @Override
- public void reportError(Throwable error) {
- _collector.reportError(error);
- }
-
- public void ack(Tuple tup) {
- _collector.ack(tup);
- }
-
- public void fail(Tuple tup) {
- _collector.fail(tup);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
deleted file mode 100644
index 1dd1c9f..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.Constants;
-import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.grouping.PartialKeyGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class BatchSubtopologyBuilder {
- Map<String, Component> _bolts = new HashMap<String, Component>();
- Component _masterBolt;
- String _masterId;
-
- public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
- Integer p = boltParallelism == null ? null : boltParallelism.intValue();
- _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
- _masterId = masterBoltId;
- }
-
- public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
- this(masterBoltId, masterBolt, null);
- }
-
- public BoltDeclarer getMasterDeclarer() {
- return new BoltDeclarerImpl(_masterBolt);
- }
-
- public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
- return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
- return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
- }
-
- private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
- Integer p = null;
- if(parallelism!=null) p = parallelism.intValue();
- Component component = new Component(bolt, p);
- _bolts.put(id, component);
- return new BoltDeclarerImpl(component);
- }
-
- public void extendTopology(TopologyBuilder builder) {
- BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
- for(InputDeclaration decl: _masterBolt.declarations) {
- decl.declare(declarer);
- }
- for(Map conf: _masterBolt.componentConfs) {
- declarer.addConfigurations(conf);
- }
- for(String id: _bolts.keySet()) {
- Component component = _bolts.get(id);
- Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
- for(String c: componentBoltSubscriptions(component)) {
- SourceArgs source;
- if(c.equals(_masterId)) {
- source = SourceArgs.single();
- } else {
- source = SourceArgs.all();
- }
- coordinatedArgs.put(c, source);
- }
-
-
- BoltDeclarer input = builder.setBolt(id,
- new CoordinatedBolt(component.bolt,
- coordinatedArgs,
- null),
- component.parallelism);
- for(Map conf: component.componentConfs) {
- input.addConfigurations(conf);
- }
- for(String c: componentBoltSubscriptions(component)) {
- input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
- }
- for(InputDeclaration d: component.declarations) {
- d.declare(input);
- }
- }
- }
-
- private Set<String> componentBoltSubscriptions(Component component) {
- Set<String> ret = new HashSet<String>();
- for(InputDeclaration d: component.declarations) {
- ret.add(d.getComponent());
- }
- return ret;
- }
-
- private static class Component {
- public IRichBolt bolt;
- public Integer parallelism;
- public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
- public List<Map<String, Object>> componentConfs = new ArrayList<>();
-
- public Component(IRichBolt bolt, Integer parallelism) {
- this.bolt = bolt;
- this.parallelism = parallelism;
- }
- }
-
- private static interface InputDeclaration {
- void declare(InputDeclarer declarer);
- String getComponent();
- }
-
- private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
- Component _component;
-
- public BoltDeclarerImpl(Component component) {
- _component = component;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, streamId, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
- return customGrouping(componentId, new PartialKeyGrouping(fields));
- }
-
- @Override
- public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
- return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, streamId, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.grouping(stream, grouping);
- }
-
- @Override
- public String getComponent() {
- return stream.get_componentId();
- }
- });
- return this;
- }
-
- private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
- }
-
- @Override
- public BoltDeclarer addConfigurations(Map<String, Object> conf) {
- _component.componentConfs.add(conf);
- return this;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java
deleted file mode 100644
index c3a428c..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.topology.FailedException;
-import java.util.Map.Entry;
-import backtype.storm.tuple.Values;
-import backtype.storm.generated.GlobalStreamId;
-import java.util.Collection;
-import backtype.storm.Constants;
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-import backtype.storm.utils.Utils;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.get;
-
-/**
- * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
- * in the case of retries.
- */
-public class CoordinatedBolt implements IRichBolt {
- public static final Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
-
- public static interface FinishedCallback {
- void finishedId(Object id);
- }
-
- public static interface TimeoutCallback {
- void timeoutId(Object id);
- }
-
-
- public static class SourceArgs implements Serializable {
- public boolean singleCount;
-
- protected SourceArgs(boolean singleCount) {
- this.singleCount = singleCount;
- }
-
- public static SourceArgs single() {
- return new SourceArgs(true);
- }
-
- public static SourceArgs all() {
- return new SourceArgs(false);
- }
-
- @Override
- public String toString() {
- return "<Single: " + singleCount + ">";
- }
- }
-
- public class CoordinatedOutputCollector implements IOutputCollector {
- IOutputCollector _delegate;
-
- public CoordinatedOutputCollector(IOutputCollector delegate) {
- _delegate = delegate;
- }
-
- public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
- List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
- updateTaskCounts(tuple.get(0), tasks);
- return tasks;
- }
-
- public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
- updateTaskCounts(tuple.get(0), Arrays.asList(task));
- _delegate.emitDirect(task, stream, anchors, tuple);
- }
-
- public void ack(Tuple tuple) {
- Object id = tuple.getValue(0);
- synchronized(_tracked) {
- TrackingInfo track = _tracked.get(id);
- if (track != null)
- track.receivedTuples++;
- }
- boolean failed = checkFinishId(tuple, TupleType.REGULAR);
- if(failed) {
- _delegate.fail(tuple);
- } else {
- _delegate.ack(tuple);
- }
- }
-
- public void fail(Tuple tuple) {
- Object id = tuple.getValue(0);
- synchronized(_tracked) {
- TrackingInfo track = _tracked.get(id);
- if (track != null)
- track.failed = true;
- }
- checkFinishId(tuple, TupleType.REGULAR);
- _delegate.fail(tuple);
- }
-
- public void reportError(Throwable error) {
- _delegate.reportError(error);
- }
-
-
- private void updateTaskCounts(Object id, List<Integer> tasks) {
- synchronized(_tracked) {
- TrackingInfo track = _tracked.get(id);
- if (track != null) {
- Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
- for(Integer task: tasks) {
- int newCount = get(taskEmittedTuples, task, 0) + 1;
- taskEmittedTuples.put(task, newCount);
- }
- }
- }
- }
- }
-
- private Map<String, SourceArgs> _sourceArgs;
- private IdStreamSpec _idStreamSpec;
- private IRichBolt _delegate;
- private Integer _numSourceReports;
- private List<Integer> _countOutTasks = new ArrayList<>();
- private OutputCollector _collector;
- private TimeCacheMap<Object, TrackingInfo> _tracked;
-
- public static class TrackingInfo {
- int reportCount = 0;
- int expectedTupleCount = 0;
- int receivedTuples = 0;
- boolean failed = false;
- Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
- boolean receivedId = false;
- boolean finished = false;
- List<Tuple> ackTuples = new ArrayList<>();
-
- @Override
- public String toString() {
- return "reportCount: " + reportCount + "\n" +
- "expectedTupleCount: " + expectedTupleCount + "\n" +
- "receivedTuples: " + receivedTuples + "\n" +
- "failed: " + failed + "\n" +
- taskEmittedTuples.toString();
- }
- }
-
-
- public static class IdStreamSpec implements Serializable {
- GlobalStreamId _id;
-
- public GlobalStreamId getGlobalStreamId() {
- return _id;
- }
-
- public static IdStreamSpec makeDetectSpec(String component, String stream) {
- return new IdStreamSpec(component, stream);
- }
-
- protected IdStreamSpec(String component, String stream) {
- _id = new GlobalStreamId(component, stream);
- }
- }
-
- public CoordinatedBolt(IRichBolt delegate) {
- this(delegate, null, null);
- }
-
- public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
- this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
- }
-
- public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
- _sourceArgs = sourceArgs;
- if(_sourceArgs==null) _sourceArgs = new HashMap<>();
- _delegate = delegate;
- _idStreamSpec = idStreamSpec;
- }
-
- public void prepare(Map config, TopologyContext context, OutputCollector collector) {
- TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
- if(_delegate instanceof TimeoutCallback) {
- callback = new TimeoutItems();
- }
- _tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback);
- _collector = collector;
- _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
- for(String component: Utils.get(context.getThisTargets(),
- Constants.COORDINATED_STREAM_ID,
- new HashMap<String, Grouping>())
- .keySet()) {
- for(Integer task: context.getComponentTasks(component)) {
- _countOutTasks.add(task);
- }
- }
- if(!_sourceArgs.isEmpty()) {
- _numSourceReports = 0;
- for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
- if(entry.getValue().singleCount) {
- _numSourceReports+=1;
- } else {
- _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
- }
- }
- }
- }
-
- private boolean checkFinishId(Tuple tup, TupleType type) {
- Object id = tup.getValue(0);
- boolean failed = false;
-
- synchronized(_tracked) {
- TrackingInfo track = _tracked.get(id);
- try {
- if(track!=null) {
- boolean delayed = false;
- if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
- track.ackTuples.add(tup);
- delayed = true;
- }
- if(track.failed) {
- failed = true;
- for(Tuple t: track.ackTuples) {
- _collector.fail(t);
- }
- _tracked.remove(id);
- } else if(track.receivedId
- && (_sourceArgs.isEmpty() ||
- track.reportCount==_numSourceReports &&
- track.expectedTupleCount == track.receivedTuples)){
- if(_delegate instanceof FinishedCallback) {
- ((FinishedCallback)_delegate).finishedId(id);
- }
- if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
- throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
- }
- Iterator<Integer> outTasks = _countOutTasks.iterator();
- while(outTasks.hasNext()) {
- int task = outTasks.next();
- int numTuples = get(track.taskEmittedTuples, task, 0);
- _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
- }
- for(Tuple t: track.ackTuples) {
- _collector.ack(t);
- }
- track.finished = true;
- _tracked.remove(id);
- }
- if(!delayed && type!=TupleType.REGULAR) {
- if(track.failed) {
- _collector.fail(tup);
- } else {
- _collector.ack(tup);
- }
- }
- } else {
- if(type!=TupleType.REGULAR) _collector.fail(tup);
- }
- } catch(FailedException e) {
- LOG.error("Failed to finish batch", e);
- for(Tuple t: track.ackTuples) {
- _collector.fail(t);
- }
- _tracked.remove(id);
- failed = true;
- }
- }
- return failed;
- }
-
- public void execute(Tuple tuple) {
- Object id = tuple.getValue(0);
- TrackingInfo track;
- TupleType type = getTupleType(tuple);
- synchronized(_tracked) {
- track = _tracked.get(id);
- if(track==null) {
- track = new TrackingInfo();
- if(_idStreamSpec==null) track.receivedId = true;
- _tracked.put(id, track);
- }
- }
-
- if(type==TupleType.ID) {
- synchronized(_tracked) {
- track.receivedId = true;
- }
- checkFinishId(tuple, type);
- } else if(type==TupleType.COORD) {
- int count = (Integer) tuple.getValue(1);
- synchronized(_tracked) {
- track.reportCount++;
- track.expectedTupleCount+=count;
- }
- checkFinishId(tuple, type);
- } else {
- synchronized(_tracked) {
- _delegate.execute(tuple);
- }
- }
- }
-
- public void cleanup() {
- _delegate.cleanup();
- _tracked.cleanup();
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _delegate.declareOutputFields(declarer);
- declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return _delegate.getComponentConfiguration();
- }
-
- private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
- Map<String, SourceArgs> ret = new HashMap<>();
- ret.put(sourceComponent, sourceArgs);
- return ret;
- }
-
- private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
- @Override
- public void expire(Object id, TrackingInfo val) {
- synchronized(_tracked) {
- // the combination of the lock and the finished flag ensure that
- // an id is never timed out if it has been finished
- val.failed = true;
- if(!val.finished) {
- ((TimeoutCallback) _delegate).timeoutId(id);
- }
- }
- }
- }
-
- private TupleType getTupleType(Tuple tuple) {
- if(_idStreamSpec!=null
- && tuple.getSourceGlobalStreamId().equals(_idStreamSpec._id)) {
- return TupleType.ID;
- } else if(!_sourceArgs.isEmpty()
- && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
- return TupleType.COORD;
- } else {
- return TupleType.REGULAR;
- }
- }
-
- static enum TupleType {
- REGULAR,
- ID,
- COORD
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java b/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java
deleted file mode 100644
index ee5d9bd..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.tuple.Tuple;
-import java.io.Serializable;
-import java.util.Map;
-
-public interface IBatchBolt<T> extends Serializable, IComponent {
- void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
- void execute(Tuple tuple);
- void finishBatch();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java b/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java
deleted file mode 100644
index 3a0dfbb..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.daemon;
-
-import backtype.storm.utils.Utils;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.InputStream;
-
-/**
- * Main executable to load and run a jar transformer
- */
-public class ClientJarTransformerRunner {
- public static void main(String [] args) throws IOException {
- JarTransformer transformer = Utils.jarTransformer(args[0]);
- InputStream in = new FileInputStream(args[1]);
- OutputStream out = new FileOutputStream(args[2]);
- transformer.transform(in, out);
- in.close();
- out.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java b/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java
deleted file mode 100644
index 67b6527..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.daemon;
-
-import java.io.IOException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.Path;
-import java.nio.file.DirectoryStream;
-import java.util.Stack;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide methods to help Logviewer to clean up
- * files in directories and to get a list of files without
- * worrying about excessive memory usage.
- *
- */
-public class DirectoryCleaner {
- private static final Logger LOG = LoggerFactory.getLogger(DirectoryCleaner.class);
- // used to recognize the pattern of active log files, we may remove the "current" from this list
- private static final Pattern ACTIVE_LOG_PATTERN = Pattern.compile(".*\\.(log|err|out|current|yaml|pid)$");
- // used to recognize the pattern of some meta files in a worker log directory
- private static final Pattern META_LOG_PATTERN= Pattern.compile(".*\\.(yaml|pid)$");
-
- // not defining this as static is to allow for mocking in tests
- public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException {
- DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath());
- return stream;
- }
-
- /**
- * If totalSize of files exceeds the either the per-worker quota or global quota,
- * Logviewer deletes oldest inactive log files in a worker directory or in all worker dirs.
- * We use the parameter for_per_dir to switch between the two deletion modes.
- * @param dirs the list of directories to be scanned for deletion
- * @param quota the per-dir quota or the total quota for the all directories
- * @param for_per_dir if true, deletion happens for a single dir; otherwise, for all directories globally
- * @param active_dirs only for global deletion, we want to skip the active logs in active_dirs
- * @return number of files deleted
- */
- public int deleteOldestWhileTooLarge(List<File> dirs,
- long quota, boolean for_per_dir, Set<String> active_dirs) throws IOException {
- final int PQ_SIZE = 1024; // max number of files to delete for every round
- final int MAX_ROUNDS = 512; // max rounds of scanning the dirs
- long totalSize = 0;
- int deletedFiles = 0;
-
- for (File dir : dirs) {
- try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
- for (Path path : stream) {
- File file = path.toFile();
- totalSize += file.length();
- }
- }
- }
- long toDeleteSize = totalSize - quota;
- if (toDeleteSize <= 0) {
- return deletedFiles;
- }
-
- Comparator<File> comparator = new Comparator<File>() {
- public int compare(File f1, File f2) {
- if (f1.lastModified() > f2.lastModified()) {
- return -1;
- } else {
- return 1;
- }
- }
- };
- // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root
- PriorityQueue<File> pq = new PriorityQueue<File>(PQ_SIZE, comparator);
- int round = 0;
- while (toDeleteSize > 0) {
- LOG.debug("To delete size is {}, start a new round of deletion, round: {}", toDeleteSize, round);
- for (File dir : dirs) {
- try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
- for (Path path : stream) {
- File file = path.toFile();
- if (for_per_dir) {
- if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) {
- continue; // skip active log files
- }
- } else { // for global cleanup
- if (active_dirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/"
- if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) {
- continue; // skip active log files
- }
- } else {
- if (META_LOG_PATTERN.matcher(file.getName()).matches()) {
- continue; // skip yaml and pid files
- }
- }
- }
- if (pq.size() < PQ_SIZE) {
- pq.offer(file);
- } else {
- if (file.lastModified() < pq.peek().lastModified()) {
- pq.poll();
- pq.offer(file);
- }
- }
- }
- }
- }
- // need to reverse the order of elements in PQ to delete files from oldest to newest
- Stack<File> stack = new Stack<File>();
- while (!pq.isEmpty()) {
- File file = pq.poll();
- stack.push(file);
- }
- while (!stack.isEmpty() && toDeleteSize > 0) {
- File file = stack.pop();
- toDeleteSize -= file.length();
- LOG.info("Delete file: {}, size: {}, lastModified: {}", file.getName(), file.length(), file.lastModified());
- file.delete();
- deletedFiles++;
- }
- pq.clear();
- round++;
- if (round >= MAX_ROUNDS) {
- if (for_per_dir) {
- LOG.warn("Reach the MAX_ROUNDS: {} during per-dir deletion, you may have too many files in " +
- "a single directory : {}, will delete the rest files in next interval.",
- MAX_ROUNDS, dirs.get(0).getCanonicalPath());
- } else {
- LOG.warn("Reach the MAX_ROUNDS: {} during global deletion, you may have too many files, " +
- "will delete the rest files in next interval.", MAX_ROUNDS);
- }
- break;
- }
- }
- return deletedFiles;
- }
-
- // Note that to avoid memory problem, we only return the first 1024 files in a directory
- public static List<File> getFilesForDir(File dir) throws IOException {
- List<File> files = new ArrayList<File>();
- final int MAX_NUM = 1024;
-
- try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath())) {
- for (Path path : stream) {
- files.add(path.toFile());
- if (files.size() >= MAX_NUM) {
- break;
- }
- }
- }
- return files;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java b/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java
deleted file mode 100644
index 914710a..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.daemon;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A plugin that can be used to transform a jar file in nimbus before it
- * is used by a topology.
- */
-public interface JarTransformer {
- public void transform(InputStream input, OutputStream output) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java b/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java
deleted file mode 100644
index b1d8ddf..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.daemon;
-
-public interface Shutdownable {
- public void shutdown();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
deleted file mode 100644
index 78e8d9b..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.drpc;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.security.auth.ThriftClient;
-import backtype.storm.security.auth.ThriftConnectionType;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
- public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
- private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>();
- private String host;
- private int port;
-
- public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
- super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
- this.host = host;
- this.port = port;
- client.set(new DistributedRPCInvocations.Client(_protocol));
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void reconnectClient() throws TException {
- if (client.get() == null) {
- reconnect();
- client.set(new DistributedRPCInvocations.Client(_protocol));
- }
- }
-
- public boolean isConnected() {
- return client.get() != null;
- }
-
- public void result(String id, String result) throws TException, AuthorizationException {
- DistributedRPCInvocations.Client c = client.get();
- try {
- if (c == null) {
- throw new TException("Client is not connected...");
- }
- c.result(id, result);
- } catch(AuthorizationException aze) {
- throw aze;
- } catch(TException e) {
- client.compareAndSet(c, null);
- throw e;
- }
- }
-
- public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
- DistributedRPCInvocations.Client c = client.get();
- try {
- if (c == null) {
- throw new TException("Client is not connected...");
- }
- return c.fetchRequest(func);
- } catch(AuthorizationException aze) {
- throw aze;
- } catch(TException e) {
- client.compareAndSet(c, null);
- throw e;
- }
- }
-
- public void failRequest(String id) throws TException, AuthorizationException {
- DistributedRPCInvocations.Client c = client.get();
- try {
- if (c == null) {
- throw new TException("Client is not connected...");
- }
- c.failRequest(id);
- } catch(AuthorizationException aze) {
- throw aze;
- } catch(TException e) {
- client.compareAndSet(c, null);
- throw e;
- }
- }
-
- public DistributedRPCInvocations.Client getClient() {
- return client.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
deleted file mode 100644
index 4ed15c0..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.drpc;
-
-import backtype.storm.Config;
-import backtype.storm.ILocalDRPC;
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.ExtendedThreadPoolExecutor;
-import backtype.storm.utils.ServiceRegistry;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Callable;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.thrift.TException;
-import org.json.simple.JSONValue;
-
-public class DRPCSpout extends BaseRichSpout {
- //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
- static final long serialVersionUID = 2387848310969237877L;
-
- public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-
- SpoutOutputCollector _collector;
- List<DRPCInvocationsClient> _clients = new ArrayList<>();
- transient LinkedList<Future<Void>> _futures = null;
- transient ExecutorService _backround = null;
- String _function;
- String _local_drpc_id = null;
-
- private static class DRPCMessageId {
- String id;
- int index;
-
- public DRPCMessageId(String id, int index) {
- this.id = id;
- this.index = index;
- }
- }
-
-
- public DRPCSpout(String function) {
- _function = function;
- }
-
- public DRPCSpout(String function, ILocalDRPC drpc) {
- _function = function;
- _local_drpc_id = drpc.getServiceId();
- }
-
- public String get_function() {
- return _function;
- }
-
- private class Adder implements Callable<Void> {
- private String server;
- private int port;
- private Map conf;
-
- public Adder(String server, int port, Map conf) {
- this.server = server;
- this.port = port;
- this.conf = conf;
- }
-
- @Override
- public Void call() throws Exception {
- DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
- synchronized (_clients) {
- _clients.add(c);
- }
- return null;
- }
- }
-
- private void reconnect(final DRPCInvocationsClient c) {
- _futures.add(_backround.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- c.reconnectClient();
- return null;
- }
- }));
- }
-
- private void checkFutures() {
- Iterator<Future<Void>> i = _futures.iterator();
- while (i.hasNext()) {
- Future<Void> f = i.next();
- if (f.isDone()) {
- i.remove();
- }
- try {
- f.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- if(_local_drpc_id==null) {
- _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- _futures = new LinkedList<>();
-
- int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
- int index = context.getThisTaskIndex();
-
- int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
- List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
- if(servers == null || servers.isEmpty()) {
- throw new RuntimeException("No DRPC servers configured for topology");
- }
-
- if (numTasks < servers.size()) {
- for (String s: servers) {
- _futures.add(_backround.submit(new Adder(s, port, conf)));
- }
- } else {
- int i = index % servers.size();
- _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
- }
- }
-
- }
-
- @Override
- public void close() {
- for(DRPCInvocationsClient client: _clients) {
- client.close();
- }
- }
-
- @Override
- public void nextTuple() {
- boolean gotRequest = false;
- if(_local_drpc_id==null) {
- int size;
- synchronized (_clients) {
- size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
- }
- for(int i=0; i<size; i++) {
- DRPCInvocationsClient client;
- synchronized (_clients) {
- client = _clients.get(i);
- }
- if (!client.isConnected()) {
- continue;
- }
- try {
- DRPCRequest req = client.fetchRequest(_function);
- if(req.get_request_id().length() > 0) {
- Map returnInfo = new HashMap();
- returnInfo.put("id", req.get_request_id());
- returnInfo.put("host", client.getHost());
- returnInfo.put("port", client.getPort());
- gotRequest = true;
- _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
- break;
- }
- } catch (AuthorizationException aze) {
- reconnect(client);
- LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
- } catch (TException e) {
- reconnect(client);
- LOG.error("Failed to fetch DRPC result from DRPC server", e);
- } catch (Exception e) {
- LOG.error("Failed to fetch DRPC result from DRPC server", e);
- }
- }
- checkFutures();
- } else {
- DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
- if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
- try {
- DRPCRequest req = drpc.fetchRequest(_function);
- if(req.get_request_id().length() > 0) {
- Map returnInfo = new HashMap();
- returnInfo.put("id", req.get_request_id());
- returnInfo.put("host", _local_drpc_id);
- returnInfo.put("port", 0);
- gotRequest = true;
- _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0));
- }
- } catch (AuthorizationException aze) {
- throw new RuntimeException(aze);
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
- }
- if(!gotRequest) {
- Utils.sleep(1);
- }
- }
-
- @Override
- public void ack(Object msgId) {
- }
-
- @Override
- public void fail(Object msgId) {
- DRPCMessageId did = (DRPCMessageId) msgId;
- DistributedRPCInvocations.Iface client;
-
- if(_local_drpc_id == null) {
- client = _clients.get(did.index);
- } else {
- client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
- }
- try {
- client.failRequest(did.id);
- } catch (AuthorizationException aze) {
- LOG.error("Not authorized to failREquest from DRPC server", aze);
- } catch (TException e) {
- LOG.error("Failed to fail request", e);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("args", "return-info"));
- }
-}