You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:53 UTC
[20/30] storm git commit: upmerge from master
upmerge from master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39ea23cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39ea23cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39ea23cd
Branch: refs/heads/master
Commit: 39ea23cdbbf64b3844205ce759701ec78d688c4a
Parents: 4e0ff2f 96f81d7
Author: 卫乐 <we...@taobao.com>
Authored: Sat Mar 5 20:02:42 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Sat Mar 5 20:02:42 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 7 +
.../src/clj/org/apache/storm/MockAutoCred.clj | 58 --
.../clj/org/apache/storm/daemon/supervisor.clj | 15 +
.../storm/cluster/StormClusterStateImpl.java | 7 +-
.../storm/daemon/metrics/MetricsUtils.java | 2 +-
.../jvm/org/apache/storm/drpc/DRPCSpout.java | 2 +
.../apache/storm/security/auth/AuthUtils.java | 40 +
.../storm/security/auth/kerberos/AutoTGT.java | 64 +-
.../auth/kerberos/AutoTGTKrb5LoginModule.java | 8 +-
.../apache/storm/stats/BoltExecutorStats.java | 6 +-
.../jvm/org/apache/storm/stats/CommonStats.java | 12 +-
.../apache/storm/stats/SpoutExecutorStats.java | 5 +-
.../jvm/org/apache/storm/stats/StatsUtil.java | 781 ++++++++++---------
.../jvm/org/apache/storm/utils/ConfigUtils.java | 10 +
.../test/clj/org/apache/storm/nimbus_test.clj | 10 +-
.../security/auth/auto_login_module_test.clj | 24 +-
.../clj/org/apache/storm/supervisor_test.clj | 6 +
.../test/jvm/org/apache/storm/MockAutoCred.java | 75 ++
18 files changed, 623 insertions(+), 509 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index f6dad09,0000000..d8c7f06
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@@ -1,118 -1,0 +1,114 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.stats;
+
+import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
- import org.apache.storm.generated.BoltStats;
- import org.apache.storm.generated.ExecutorSpecificStats;
- import org.apache.storm.generated.ExecutorStats;
- import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+
+@SuppressWarnings("unchecked")
+public class BoltExecutorStats extends CommonStats {
+
+ public static final String ACKED = "acked";
+ public static final String FAILED = "failed";
+ public static final String EXECUTED = "executed";
+ public static final String PROCESS_LATENCIES = "process-latencies";
+ public static final String EXECUTE_LATENCIES = "execute-latencies";
+
+ public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES};
+
+ public BoltExecutorStats(int rate) {
+ super(rate);
+
+ this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
+ }
+
+ public MultiCountStatAndMetric getAcked() {
+ return (MultiCountStatAndMetric) this.get(ACKED);
+ }
+
+ public MultiCountStatAndMetric getFailed() {
+ return (MultiCountStatAndMetric) this.get(FAILED);
+ }
+
+ public MultiCountStatAndMetric getExecuted() {
+ return (MultiCountStatAndMetric) this.get(EXECUTED);
+ }
+
+ public MultiLatencyStatAndMetric getProcessLatencies() {
+ return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES);
+ }
+
+ public MultiLatencyStatAndMetric getExecuteLatencies() {
+ return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES);
+ }
+
+ public void boltExecuteTuple(String component, String stream, long latencyMs) {
+ List key = Lists.newArrayList(component, stream);
+ this.getExecuted().incBy(key, this.rate);
+ this.getExecuteLatencies().record(key, latencyMs);
+ }
+
+ public void boltAckedTuple(String component, String stream, long latencyMs) {
+ List key = Lists.newArrayList(component, stream);
+ this.getAcked().incBy(key, this.rate);
+ this.getProcessLatencies().record(key, latencyMs);
+ }
+
+ public void boltFailedTuple(String component, String stream, long latencyMs) {
+ List key = Lists.newArrayList(component, stream);
+ this.getFailed().incBy(key, this.rate);
+
+ }
+
+ public Map renderStats() {
+ cleanupStats();
+ Map ret = new HashMap();
+ ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
+ ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS));
- StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
++ StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
+
+ return ret;
+ }
+
+// public ExecutorStats renderStats() {
+// cleanupStats();
+//
+// ExecutorStats ret = new ExecutorStats();
+// ret.set_emitted(valueStat(EMITTED));
+// ret.set_transferred(valueStat(TRANSFERRED));
+// ret.set_rate(this.rate);
+//
+// BoltStats boltStats = new BoltStats(
+// StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+// StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+// StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+// StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
+// StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY));
+// ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
+//
+// return ret;
+// }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
index e386413,0000000..f7826f9
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
@@@ -1,112 -1,0 +1,112 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+
+@SuppressWarnings("unchecked")
+public class CommonStats {
+ public static final int NUM_STAT_BUCKETS = 20;
+
+ public static final String RATE = "rate";
+
+ public static final String EMITTED = "emitted";
+ public static final String TRANSFERRED = "transferred";
+ public static final String[] COMMON_FIELDS = {EMITTED, TRANSFERRED};
+
+ protected final int rate;
- protected final Map metricMap = new HashMap();
++ protected final Map<String, IMetric> metricMap = new HashMap<>();
+
+ public CommonStats(int rate) {
+ this.rate = rate;
+ this.put(EMITTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(TRANSFERRED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ }
+
+ public int getRate() {
+ return this.rate;
+ }
+
+ public MultiCountStatAndMetric getEmitted() {
+ return (MultiCountStatAndMetric) get(EMITTED);
+ }
+
+ public MultiCountStatAndMetric getTransferred() {
+ return (MultiCountStatAndMetric) get(TRANSFERRED);
+ }
+
+ public IMetric get(String field) {
- return (IMetric) StatsUtil.getByKeyword(metricMap, field);
++ return (IMetric) StatsUtil.getByKey(metricMap, field);
+ }
+
+ protected void put(String field, Object value) {
- StatsUtil.putRawKV(metricMap, field, value);
++ StatsUtil.putKV(metricMap, field, value);
+ }
+
+ public void emittedTuple(String stream) {
+ this.getEmitted().incBy(stream, this.rate);
+ }
+
+ public void transferredTuples(String stream, int amount) {
+ this.getTransferred().incBy(stream, this.rate * amount);
+ }
+
+ public void cleanupStats() {
+ for (Object imetric : this.metricMap.values()) {
+ cleanupStat((IMetric) imetric);
+ }
+ }
+
+ private void cleanupStat(IMetric metric) {
+ if (metric instanceof MultiCountStatAndMetric) {
+ ((MultiCountStatAndMetric) metric).close();
+ } else if (metric instanceof MultiLatencyStatAndMetric) {
+ ((MultiLatencyStatAndMetric) metric).close();
+ }
+ }
+
+ protected Map valueStats(String[] fields) {
+ Map ret = new HashMap();
+ for (String field : fields) {
+ IMetric metric = this.get(field);
+ if (metric instanceof MultiCountStatAndMetric) {
- StatsUtil.putRawKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts());
++ StatsUtil.putKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts());
+ } else if (metric instanceof MultiLatencyStatAndMetric) {
- StatsUtil.putRawKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg());
++ StatsUtil.putKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg());
+ }
+ }
- StatsUtil.putRawKV(ret, CommonStats.RATE, this.getRate());
++ StatsUtil.putKV(ret, CommonStats.RATE, this.getRate());
+
+ return ret;
+ }
+
+ protected Map valueStat(String field) {
+ IMetric metric = this.get(field);
+ if (metric instanceof MultiCountStatAndMetric) {
+ return ((MultiCountStatAndMetric) metric).getTimeCounts();
+ } else if (metric instanceof MultiLatencyStatAndMetric) {
+ return ((MultiLatencyStatAndMetric) metric).getTimeLatAvg();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 918ae06,0000000..27c626e
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@@ -1,89 -1,0 +1,86 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.Map;
- import org.apache.storm.generated.ExecutorSpecificStats;
- import org.apache.storm.generated.ExecutorStats;
- import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+
+@SuppressWarnings("unchecked")
+public class SpoutExecutorStats extends CommonStats {
+
+ public static final String ACKED = "acked";
+ public static final String FAILED = "failed";
+ public static final String COMPLETE_LATENCIES = "complete-latencies";
+
+ public static final String[] SPOUT_FIELDS = {ACKED, FAILED, COMPLETE_LATENCIES};
+
+ public SpoutExecutorStats(int rate) {
+ super(rate);
+ this.put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+ this.put(COMPLETE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
+ }
+
+ public MultiCountStatAndMetric getAcked() {
+ return (MultiCountStatAndMetric) this.get(ACKED);
+ }
+
+ public MultiCountStatAndMetric getFailed() {
+ return (MultiCountStatAndMetric) this.get(FAILED);
+ }
+
+ public MultiLatencyStatAndMetric getCompleteLatencies() {
+ return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES);
+ }
+
+ public void spoutAckedTuple(String stream, long latencyMs) {
+ this.getAcked().incBy(stream, this.rate);
+ this.getCompleteLatencies().record(stream, latencyMs);
+ }
+
+ public void spoutFailedTuple(String stream, long latencyMs) {
+ this.getFailed().incBy(stream, this.rate);
+ }
+
+ public Map renderStats() {
+ cleanupStats();
+ Map ret = new HashMap();
+ ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
+ ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS));
- StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
++ StatsUtil.putKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
+
+ return ret;
+ }
+
+// public ExecutorStats renderStats() {
+// cleanupStats();
+//
+// ExecutorStats ret = new ExecutorStats();
+// ret.set_emitted(valueStat(EMITTED));
+// ret.set_transferred(valueStat(TRANSFERRED));
+// ret.set_rate(this.rate);
+//
+// SpoutStats spoutStats = new SpoutStats(
+// valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES));
+// ret.set_specific(ExecutorSpecificStats.spout(spoutStats));
+//
+// return ret;
+// }
+}