You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/07/05 02:49:11 UTC
[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor
to java
[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361929#comment-15361929 ]
ASF GitHub Bot commented on STORM-1277:
---------------------------------------
Github user lujinhong commented on a diff in the pull request:
https://github.com/apache/storm/pull/1445#discussion_r69502037
--- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
+
+ private final ISpoutWaitStrategy spoutWaitStrategy;
+ private Integer maxSpoutPending;
+ private final AtomicBoolean lastActive;
+ private List<ISpout> spouts;
+ private List<SpoutOutputCollector> outputCollectors;
+ private final MutableLong emittedCount;
+ private final MutableLong emptyEmitStreak;
+ private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+ private final boolean hasAckers;
+ private RotatingMap<Long, TupleInfo> pending;
+ private final boolean backPressureEnabled;
+
+ public SpoutExecutor(final Map workerData, final List<Long> executorId, Map<String, String> credentials) {
+ super(workerData, executorId, credentials);
+ this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+ this.spoutWaitStrategy.prepare(stormConf);
+
+ this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+ this.lastActive = new AtomicBoolean(false);
+ this.hasAckers = StormCommon.hasAckers(stormConf);
+ this.emittedCount = new MutableLong(0);
+ this.emptyEmitStreak = new MutableLong(0);
+ this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+ }
+
+ @Override
+ public void init(final Map<Integer, Task> idToTask) {
+ LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+ this.idToTask = idToTask;
+ this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
--- End diff --
Default value of TOPOLOGY_MAX_SPOUT_PENDING may be 1
> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
> Key: STORM-1277
> URL: https://issues.apache.org/jira/browse/STORM-1277
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Cody
> Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task kind of. Tasks and executors are combined in jstorm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)