You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:53 UTC

[20/30] storm git commit: upmerge from master

upmerge from master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39ea23cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39ea23cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39ea23cd

Branch: refs/heads/master
Commit: 39ea23cdbbf64b3844205ce759701ec78d688c4a
Parents: 4e0ff2f 96f81d7
Author: 卫乐 <we...@taobao.com>
Authored: Sat Mar 5 20:02:42 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Sat Mar 5 20:02:42 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   7 +
 .../src/clj/org/apache/storm/MockAutoCred.clj   |  58 --
 .../clj/org/apache/storm/daemon/supervisor.clj  |  15 +
 .../storm/cluster/StormClusterStateImpl.java    |   7 +-
 .../storm/daemon/metrics/MetricsUtils.java      |   2 +-
 .../jvm/org/apache/storm/drpc/DRPCSpout.java    |   2 +
 .../apache/storm/security/auth/AuthUtils.java   |  40 +
 .../storm/security/auth/kerberos/AutoTGT.java   |  64 +-
 .../auth/kerberos/AutoTGTKrb5LoginModule.java   |   8 +-
 .../apache/storm/stats/BoltExecutorStats.java   |   6 +-
 .../jvm/org/apache/storm/stats/CommonStats.java |  12 +-
 .../apache/storm/stats/SpoutExecutorStats.java  |   5 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 781 ++++++++++---------
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  10 +
 .../test/clj/org/apache/storm/nimbus_test.clj   |  10 +-
 .../security/auth/auto_login_module_test.clj    |  24 +-
 .../clj/org/apache/storm/supervisor_test.clj    |   6 +
 .../test/jvm/org/apache/storm/MockAutoCred.java |  75 ++
 18 files changed, 623 insertions(+), 509 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index f6dad09,0000000..d8c7f06
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@@ -1,118 -1,0 +1,114 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.stats;
 +
 +import com.google.common.collect.Lists;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
- import org.apache.storm.generated.BoltStats;
- import org.apache.storm.generated.ExecutorSpecificStats;
- import org.apache.storm.generated.ExecutorStats;
- import org.apache.storm.generated.SpoutStats;
 +import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 +
 +@SuppressWarnings("unchecked")
 +public class BoltExecutorStats extends CommonStats {
 +
 +    public static final String ACKED = "acked";
 +    public static final String FAILED = "failed";
 +    public static final String EXECUTED = "executed";
 +    public static final String PROCESS_LATENCIES = "process-latencies";
 +    public static final String EXECUTE_LATENCIES = "execute-latencies";
 +
 +    public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES};
 +
 +    public BoltExecutorStats(int rate) {
 +        super(rate);
 +
 +        this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
 +    }
 +
 +    public MultiCountStatAndMetric getAcked() {
 +        return (MultiCountStatAndMetric) this.get(ACKED);
 +    }
 +
 +    public MultiCountStatAndMetric getFailed() {
 +        return (MultiCountStatAndMetric) this.get(FAILED);
 +    }
 +
 +    public MultiCountStatAndMetric getExecuted() {
 +        return (MultiCountStatAndMetric) this.get(EXECUTED);
 +    }
 +
 +    public MultiLatencyStatAndMetric getProcessLatencies() {
 +        return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES);
 +    }
 +
 +    public MultiLatencyStatAndMetric getExecuteLatencies() {
 +        return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES);
 +    }
 +
 +    public void boltExecuteTuple(String component, String stream, long latencyMs) {
 +        List key = Lists.newArrayList(component, stream);
 +        this.getExecuted().incBy(key, this.rate);
 +        this.getExecuteLatencies().record(key, latencyMs);
 +    }
 +
 +    public void boltAckedTuple(String component, String stream, long latencyMs) {
 +        List key = Lists.newArrayList(component, stream);
 +        this.getAcked().incBy(key, this.rate);
 +        this.getProcessLatencies().record(key, latencyMs);
 +    }
 +
 +    public void boltFailedTuple(String component, String stream, long latencyMs) {
 +        List key = Lists.newArrayList(component, stream);
 +        this.getFailed().incBy(key, this.rate);
 +
 +    }
 +
 +    public Map renderStats() {
 +        cleanupStats();
 +        Map ret = new HashMap();
 +        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
 +        ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS));
-         StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
++        StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
 +
 +        return ret;
 +    }
 +
 +//    public ExecutorStats renderStats() {
 +//        cleanupStats();
 +//
 +//        ExecutorStats ret = new ExecutorStats();
 +//        ret.set_emitted(valueStat(EMITTED));
 +//        ret.set_transferred(valueStat(TRANSFERRED));
 +//        ret.set_rate(this.rate);
 +//
 +//        BoltStats boltStats = new BoltStats(
 +//                StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
 +//                StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
 +//                StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
 +//                StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
 +//                StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY));
 +//        ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
 +//
 +//        return ret;
 +//    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
index e386413,0000000..f7826f9
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
@@@ -1,112 -1,0 +1,112 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.stats;
 +
 +import java.util.HashMap;
 +import java.util.Map;
 +import org.apache.storm.metric.api.IMetric;
 +import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 +
 +@SuppressWarnings("unchecked")
 +public class CommonStats {
 +    public static final int NUM_STAT_BUCKETS = 20;
 +
 +    public static final String RATE = "rate";
 +
 +    public static final String EMITTED = "emitted";
 +    public static final String TRANSFERRED = "transferred";
 +    public static final String[] COMMON_FIELDS = {EMITTED, TRANSFERRED};
 +
 +    protected final int rate;
-     protected final Map metricMap = new HashMap();
++    protected final Map<String, IMetric> metricMap = new HashMap<>();
 +
 +    public CommonStats(int rate) {
 +        this.rate = rate;
 +        this.put(EMITTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(TRANSFERRED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +    }
 +
 +    public int getRate() {
 +        return this.rate;
 +    }
 +
 +    public MultiCountStatAndMetric getEmitted() {
 +        return (MultiCountStatAndMetric) get(EMITTED);
 +    }
 +
 +    public MultiCountStatAndMetric getTransferred() {
 +        return (MultiCountStatAndMetric) get(TRANSFERRED);
 +    }
 +
 +    public IMetric get(String field) {
-         return (IMetric) StatsUtil.getByKeyword(metricMap, field);
++        return (IMetric) StatsUtil.getByKey(metricMap, field);
 +    }
 +
 +    protected void put(String field, Object value) {
-         StatsUtil.putRawKV(metricMap, field, value);
++        StatsUtil.putKV(metricMap, field, value);
 +    }
 +
 +    public void emittedTuple(String stream) {
 +        this.getEmitted().incBy(stream, this.rate);
 +    }
 +
 +    public void transferredTuples(String stream, int amount) {
 +        this.getTransferred().incBy(stream, this.rate * amount);
 +    }
 +
 +    public void cleanupStats() {
 +        for (Object imetric : this.metricMap.values()) {
 +            cleanupStat((IMetric) imetric);
 +        }
 +    }
 +
 +    private void cleanupStat(IMetric metric) {
 +        if (metric instanceof MultiCountStatAndMetric) {
 +            ((MultiCountStatAndMetric) metric).close();
 +        } else if (metric instanceof MultiLatencyStatAndMetric) {
 +            ((MultiLatencyStatAndMetric) metric).close();
 +        }
 +    }
 +
 +    protected Map valueStats(String[] fields) {
 +        Map ret = new HashMap();
 +        for (String field : fields) {
 +            IMetric metric = this.get(field);
 +            if (metric instanceof MultiCountStatAndMetric) {
-                 StatsUtil.putRawKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts());
++                StatsUtil.putKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts());
 +            } else if (metric instanceof MultiLatencyStatAndMetric) {
-                 StatsUtil.putRawKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg());
++                StatsUtil.putKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg());
 +            }
 +        }
-         StatsUtil.putRawKV(ret, CommonStats.RATE, this.getRate());
++        StatsUtil.putKV(ret, CommonStats.RATE, this.getRate());
 +
 +        return ret;
 +    }
 +
 +    protected Map valueStat(String field) {
 +        IMetric metric = this.get(field);
 +        if (metric instanceof MultiCountStatAndMetric) {
 +            return ((MultiCountStatAndMetric) metric).getTimeCounts();
 +        } else if (metric instanceof MultiLatencyStatAndMetric) {
 +            return ((MultiLatencyStatAndMetric) metric).getTimeLatAvg();
 +        }
 +        return null;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 918ae06,0000000..27c626e
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@@ -1,89 -1,0 +1,86 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.stats;
 +
 +import java.util.HashMap;
 +import java.util.Map;
- import org.apache.storm.generated.ExecutorSpecificStats;
- import org.apache.storm.generated.ExecutorStats;
- import org.apache.storm.generated.SpoutStats;
 +import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 +
 +@SuppressWarnings("unchecked")
 +public class SpoutExecutorStats extends CommonStats {
 +
 +    public static final String ACKED = "acked";
 +    public static final String FAILED = "failed";
 +    public static final String COMPLETE_LATENCIES = "complete-latencies";
 +
 +    public static final String[] SPOUT_FIELDS = {ACKED, FAILED, COMPLETE_LATENCIES};
 +
 +    public SpoutExecutorStats(int rate) {
 +        super(rate);
 +        this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
 +        this.put(COMPLETE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
 +    }
 +
 +    public MultiCountStatAndMetric getAcked() {
 +        return (MultiCountStatAndMetric) this.get(ACKED);
 +    }
 +
 +    public MultiCountStatAndMetric getFailed() {
 +        return (MultiCountStatAndMetric) this.get(FAILED);
 +    }
 +
 +    public MultiLatencyStatAndMetric getCompleteLatencies() {
 +        return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES);
 +    }
 +
 +    public void spoutAckedTuple(String stream, long latencyMs) {
 +        this.getAcked().incBy(stream, this.rate);
 +        this.getCompleteLatencies().record(stream, latencyMs);
 +    }
 +
 +    public void spoutFailedTuple(String stream, long latencyMs) {
 +        this.getFailed().incBy(stream, this.rate);
 +    }
 +
 +    public Map renderStats() {
 +        cleanupStats();
 +        Map ret = new HashMap();
 +        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
 +        ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS));
-         StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
++        StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
 +
 +        return ret;
 +    }
 +
 +//    public ExecutorStats renderStats() {
 +//        cleanupStats();
 +//
 +//        ExecutorStats ret = new ExecutorStats();
 +//        ret.set_emitted(valueStat(EMITTED));
 +//        ret.set_transferred(valueStat(TRANSFERRED));
 +//        ret.set_rate(this.rate);
 +//
 +//        SpoutStats spoutStats = new SpoutStats(
 +//                valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES));
 +//        ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
 +//
 +//        return ret;
 +//    }
 +}