You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/07/05 02:49:11 UTC

[jira] [Commented] (STORM-1277) port backtype.storm.daemon.executor to java

    [ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361929#comment-15361929 ] 

ASF GitHub Bot commented on STORM-1277:
---------------------------------------

Github user lujinhong commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1445#discussion_r69502037
  
    --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
    @@ -0,0 +1,243 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.executor.spout;
    +
    +import com.google.common.collect.ImmutableMap;
    +import org.apache.storm.Config;
    +import org.apache.storm.Constants;
    +import org.apache.storm.ICredentialsListener;
    +import org.apache.storm.daemon.Acker;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.daemon.Task;
    +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
    +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
    +import org.apache.storm.executor.Executor;
    +import org.apache.storm.executor.TupleInfo;
    +import org.apache.storm.hooks.info.SpoutAckInfo;
    +import org.apache.storm.hooks.info.SpoutFailInfo;
    +import org.apache.storm.spout.ISpout;
    +import org.apache.storm.spout.ISpoutWaitStrategy;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.stats.SpoutExecutorStats;
    +import org.apache.storm.tuple.TupleImpl;
    +import org.apache.storm.utils.*;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +public class SpoutExecutor extends Executor {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);
    +
    +    private final ISpoutWaitStrategy spoutWaitStrategy;
    +    private Integer maxSpoutPending;
    +    private final AtomicBoolean lastActive;
    +    private List<ISpout> spouts;
    +    private List<SpoutOutputCollector> outputCollectors;
    +    private final MutableLong emittedCount;
    +    private final MutableLong emptyEmitStreak;
    +    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
    +    private final boolean hasAckers;
    +    private RotatingMap<Long, TupleInfo> pending;
    +    private final boolean backPressureEnabled;
    +
    +    public SpoutExecutor(final Map workerData, final List<Long> executorId, Map<String, String> credentials) {
    +        super(workerData, executorId, credentials);
    +        this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
    +        this.spoutWaitStrategy.prepare(stormConf);
    +
    +        this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
    +
    +        this.lastActive = new AtomicBoolean(false);
    +        this.hasAckers = StormCommon.hasAckers(stormConf);
    +        this.emittedCount = new MutableLong(0);
    +        this.emptyEmitStreak = new MutableLong(0);
    +        this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
    +    }
    +
    +    @Override
    +    public void init(final Map<Integer, Task> idToTask) {
    +        LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
    +        this.idToTask = idToTask;
    +        this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
    --- End diff --
    
    Default value of TOPOLOGY_MAX_SPOUT_PENDING may be 1


> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
>                 Key: STORM-1277
>                 URL: https://issues.apache.org/jira/browse/STORM-1277
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Cody
>              Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task kind of.  Tasks and executors are combined in jstorm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)