You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:57 UTC
[19/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java
new file mode 100644
index 0000000..4e31019
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+public interface Merger<V> extends Serializable {
+ V merge(Collection<V> objs, V unflushed, Object... others);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java
new file mode 100644
index 0000000..b4d65e6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.util.Collection;
+
+import com.alibaba.jstorm.utils.JStormUtils;
+
+public class SumMerger<T extends Number> implements Merger<T> {
+ private static final long serialVersionUID = -7026523452570138433L;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T merge(Collection<T> objs, T unflushed, Object... others) {
+ // TODO Auto-generated method stub
+ T ret = unflushed;
+ for (T obj : objs) {
+ ret = (T) JStormUtils.add(ret, obj);
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java
new file mode 100644
index 0000000..4d770d6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.util.Collection;
+
+import com.alibaba.jstorm.common.metric.old.operator.StartTime;
+
+public class TpsMerger implements Merger<Double> {
+ private static final long serialVersionUID = -4534840881635955942L;
+ protected final long createTime;
+
+ public TpsMerger() {
+ createTime = System.currentTimeMillis();
+ }
+
+ public long getRunMillis(Object... args) {
+ long startTime = createTime;
+
+ if (args != null) {
+ if (args[0] != null && args[0] instanceof StartTime) {
+ StartTime rollingWindow = (StartTime) args[0];
+
+ startTime = rollingWindow.getStartTime();
+ }
+ }
+
+ return (System.currentTimeMillis() - startTime);
+ }
+
+ @Override
+ public Double merge(Collection<Double> objs, Double unflushed, Object... others) {
+ // TODO Auto-generated method stub
+ double sum = 0.0d;
+ if (unflushed != null) {
+ sum += unflushed;
+ }
+
+ for (Double item : objs) {
+ if (item != null) {
+ sum += item;
+ }
+ }
+
+ Double ret = (sum * 1000) / getRunMillis(others);
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java
new file mode 100644
index 0000000..a6c06f6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import com.alibaba.jstorm.utils.JStormUtils;
+
+public class AddUpdater<T extends Number> implements Updater<T> {
+ private static final long serialVersionUID = -7955740095421752763L;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T update(Number object, T cache, Object... others) {
+ // TODO Auto-generated method stub
+ return (T) JStormUtils.add(cache, object);
+ }
+
+ @Override
+ public T updateBatch(T object, T cache, Object... objects) {
+ // TODO Auto-generated method stub
+ return (T) JStormUtils.add(cache, object);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java
new file mode 100644
index 0000000..a0b7aa1
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import com.alibaba.jstorm.common.metric.old.Histogram;
+
+public class AvgUpdater implements Updater<Histogram.HistorgramPair> {
+ private static final long serialVersionUID = 2562836921724586449L;
+
+ @Override
+ public Histogram.HistorgramPair update(Number object, Histogram.HistorgramPair cache, Object... others) {
+ // TODO Auto-generated method stub
+ if (object == null) {
+ return cache;
+ }
+ if (cache == null) {
+ cache = new Histogram.HistorgramPair();
+ }
+
+ cache.addValue(object.doubleValue());
+ cache.addTimes(1l);
+
+ return cache;
+ }
+
+ @Override
+ public Histogram.HistorgramPair updateBatch(Histogram.HistorgramPair object, Histogram.HistorgramPair cache, Object... objects) {
+ // TODO Auto-generated method stub
+ if (object == null) {
+ return cache;
+ }
+ if (cache == null) {
+ cache = new Histogram.HistorgramPair();
+ }
+
+ cache.addValue(object.getSum());
+ cache.addTimes(object.getTimes());
+
+ return cache;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java
new file mode 100644
index 0000000..b25f97b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class DoubleAddUpdater implements Updater<AtomicDouble> {
+ private static final long serialVersionUID = -1293565961076552462L;
+
+ @Override
+ public AtomicDouble update(Number object, AtomicDouble cache, Object... others) {
+ // TODO Auto-generated method stub
+ if (cache == null) {
+ cache = new AtomicDouble(0.0);
+ }
+ if (object != null) {
+ cache.addAndGet(object.doubleValue());
+ }
+ return cache;
+ }
+
+ @Override
+ public AtomicDouble updateBatch(AtomicDouble object, AtomicDouble cache, Object... objects) {
+ // TODO Auto-generated method stub
+ return update(object, cache, objects);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java
new file mode 100644
index 0000000..8db6f21
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongAddUpdater implements Updater<AtomicLong> {
+ private static final long serialVersionUID = -2185639264737912405L;
+
+ @Override
+ public AtomicLong update(Number object, AtomicLong cache, Object... others) {
+ // TODO Auto-generated method stub
+ if (cache == null) {
+ cache = new AtomicLong(0);
+ }
+
+ if (object != null) {
+ cache.addAndGet(object.longValue());
+ }
+ return cache;
+ }
+
+ @Override
+ public AtomicLong updateBatch(AtomicLong object, AtomicLong cache, Object... objects) {
+ // TODO Auto-generated method stub
+ return update(object, cache, objects);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java
new file mode 100644
index 0000000..42d7b58
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import java.io.Serializable;
+
+public interface Updater<V> extends Serializable {
+ V update(Number object, V cache, Object... others);
+
+ V updateBatch(V object, V cache, Object... objects);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java
new file mode 100644
index 0000000..244db74
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.alibaba.jstorm.common.metric.old.operator.Sampling;
+import com.alibaba.jstorm.common.metric.old.operator.StartTime;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+
+import java.util.ArrayList;
+
+public class AllWindow<V> implements Sampling<V>, StartTime {
+
+ private static final long serialVersionUID = -8523514907315740812L;
+
+ protected V unflushed;
+ protected V defaultValue;
+
+ protected Updater<V> updater;
+ protected Merger<V> merger;
+ protected long startTime;
+
+ AllWindow(V defaultValue, Updater<V> updater, Merger<V> merger) {
+
+ this.updater = updater;
+ this.merger = merger;
+
+ this.defaultValue = defaultValue;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void update(Number obj) {
+ // TODO Auto-generated method stub
+ synchronized (this) {
+ unflushed = updater.update(obj, unflushed);
+ }
+ }
+
+ public void updateBatch(V batch) {
+ synchronized (this) {
+ unflushed = updater.updateBatch(batch, unflushed);
+ }
+ }
+
+ @Override
+ public V getSnapshot() {
+ // TODO Auto-generated method stub
+ V ret = merger.merge(new ArrayList<V>(), unflushed, this);
+ if (ret == null) {
+ return defaultValue;
+ } else {
+ return ret;
+ }
+ }
+
+ @Override
+ public long getStartTime() {
+ // TODO Auto-generated method stub
+ return startTime;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java
new file mode 100644
index 0000000..e505a1f
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.alibaba.jstorm.callback.Callback;
+import com.alibaba.jstorm.common.metric.old.operator.Sampling;
+import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+import com.alibaba.jstorm.utils.IntervalCheck;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class Metric<T, V> implements Sampling<Map<Integer, T>> {
+ private static final long serialVersionUID = -1362345159511508074L;
+ private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
+
+ protected static boolean enable;
+
+ public static void setEnable(boolean e) {
+ enable = e;
+ }
+
+ protected List<RollingWindow<V>> rollingWindows;
+ protected AllWindow<V> allWindow;
+
+ protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW, StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW };
+ protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS;
+ protected V defaultValue;
+ protected Updater<V> updater;
+ protected Merger<V> merger;
+ protected Convertor<V, T> convertor;
+ protected Callback callback;
+
+ protected int interval; // unit is second
+ protected IntervalCheck intervalCheck;
+ protected V unflushed;
+
+ public Metric() {
+ }
+
+ public int getInterval() {
+ if (windowSeconds == null || windowSeconds.length == 0) {
+ return StatBuckets.NUM_STAT_BUCKETS;
+ }
+
+ int intervals[] = new int[windowSeconds.length];
+ int smallest = Integer.MAX_VALUE;
+ for (int i = 0; i < windowSeconds.length; i++) {
+ int interval = windowSeconds[i] / bucketSize;
+ intervals[i] = interval;
+ if (interval < smallest) {
+ smallest = interval;
+ }
+ }
+
+ for (int goodInterval = smallest; goodInterval > 1; goodInterval--) {
+ boolean good = true;
+ for (int interval : intervals) {
+ if (interval % goodInterval != 0) {
+ good = false;
+ break;
+ }
+ }
+
+ if (good == true) {
+ return goodInterval;
+ }
+ }
+
+ return 1;
+ }
+
+ public void init() {
+ if (defaultValue == null || updater == null || merger == null || convertor == null) {
+ throw new IllegalArgumentException("Invalid argements");
+ }
+
+ rollingWindows = new ArrayList<RollingWindow<V>>();
+ if (windowSeconds != null) {
+ rollingWindows.clear();
+ for (int windowSize : windowSeconds) {
+ RollingWindow<V> rollingWindow = new RollingWindow<V>(defaultValue, windowSize / bucketSize, windowSize, updater, merger);
+
+ rollingWindows.add(rollingWindow);
+ }
+
+ }
+ allWindow = new AllWindow<V>(defaultValue, updater, merger);
+
+ this.interval = getInterval();
+ this.intervalCheck = new IntervalCheck();
+ this.intervalCheck.setInterval(interval);
+ }
+
+ /**
+ * In order to improve performance Do
+ */
+ @Override
+ public void update(Number obj) {
+ if (enable == false) {
+ return;
+ }
+
+ if (intervalCheck.check()) {
+ flush();
+ }
+ synchronized (this) {
+ unflushed = updater.update(obj, unflushed);
+ }
+ }
+
+ public synchronized void flush() {
+ if (unflushed == null) {
+ return;
+ }
+ for (RollingWindow<V> rollingWindow : rollingWindows) {
+ rollingWindow.updateBatch(unflushed);
+ }
+ allWindow.updateBatch(unflushed);
+ unflushed = null;
+ }
+
+ @Override
+ public Map<Integer, T> getSnapshot() {
+ // TODO Auto-generated method stub
+ flush();
+
+ Map<Integer, T> ret = new TreeMap<Integer, T>();
+ for (RollingWindow<V> rollingWindow : rollingWindows) {
+ V value = rollingWindow.getSnapshot();
+
+ ret.put(rollingWindow.getWindowSecond(), convertor.convert(value));
+ }
+
+ ret.put(StatBuckets.ALL_TIME_WINDOW, convertor.convert(allWindow.getSnapshot()));
+
+ if (callback != null) {
+ callback.execute(this);
+ }
+ return ret;
+ }
+
+ public T getAllTimeValue() {
+ return convertor.convert(allWindow.getSnapshot());
+ }
+
+ public int[] getWindowSeconds() {
+ return windowSeconds;
+ }
+
+ public void setWindowSeconds(int[] windowSeconds) {
+ this.windowSeconds = windowSeconds;
+ }
+
+ public int getBucketSize() {
+ return bucketSize;
+ }
+
+ public void setBucketSize(int bucketSize) {
+ this.bucketSize = bucketSize;
+ }
+
+ public V getDefaultValue() {
+ return defaultValue;
+ }
+
+ public void setDefaultValue(V defaultValue) {
+ this.defaultValue = defaultValue;
+ }
+
+ public Updater<V> getUpdater() {
+ return updater;
+ }
+
+ public void setUpdater(Updater<V> updater) {
+ this.updater = updater;
+ }
+
+ public Merger<V> getMerger() {
+ return merger;
+ }
+
+ public void setMerger(Merger<V> merger) {
+ this.merger = merger;
+ }
+
+ public Convertor<V, T> getConvertor() {
+ return convertor;
+ }
+
+ public void setConvertor(Convertor<V, T> convertor) {
+ this.convertor = convertor;
+ }
+
+ public Callback getCallback() {
+ return callback;
+ }
+
+ public void setCallback(Callback callback) {
+ this.callback = callback;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java
new file mode 100644
index 0000000..5963951
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.alibaba.jstorm.common.metric.old.operator.Sampling;
+import com.alibaba.jstorm.common.metric.old.operator.StartTime;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+import com.alibaba.jstorm.utils.IntervalCheck;
+import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeMap;
+
+public class RollingWindow<V> implements Sampling<V>, StartTime {
+ private static final long serialVersionUID = 3794478417380003279L;
+ private static final Logger LOG = LoggerFactory.getLogger(RollingWindow.class);
+
+ protected long startTime;
+ protected Integer currBucketTime;
+ protected int interval; // unit is second
+ protected int windowSecond;
+ protected IntervalCheck intervalCheck;
+
+ protected TreeMap<Integer, V> buckets;
+ protected Integer bucketNum;
+ protected V unflushed;
+ protected V defaultValue;
+
+ protected Updater<V> updater;
+ protected Merger<V> merger;
+
+ RollingWindow(V defaultValue, int interval, int windowSecond, Updater<V> updater, Merger<V> merger) {
+ this.startTime = System.currentTimeMillis();
+ this.interval = interval;
+ this.intervalCheck = new IntervalCheck();
+ this.intervalCheck.setInterval(interval);
+ this.currBucketTime = getCurrBucketTime();
+
+ this.bucketNum = windowSecond / interval;
+ this.windowSecond = (bucketNum) * interval;
+
+ this.buckets = new TreeMap<Integer, V>();
+
+ this.updater = updater;
+ this.merger = merger;
+
+ this.defaultValue = defaultValue;
+
+ }
+
+ @Override
+ public void update(Number obj) {
+ // TODO Auto-generated method stub
+
+ if (intervalCheck.check()) {
+ rolling();
+ }
+ synchronized (this) {
+ unflushed = updater.update(obj, unflushed);
+
+ }
+
+ }
+
+ /**
+ * In order to improve performance Flush one batch to rollingWindow
+ *
+ */
+ public void updateBatch(V batch) {
+
+ if (intervalCheck.check()) {
+ rolling();
+ }
+ synchronized (this) {
+ unflushed = updater.updateBatch(batch, unflushed);
+ }
+
+ }
+
+ @Override
+ public V getSnapshot() {
+ // TODO Auto-generated method stub
+ if (intervalCheck.check()) {
+ rolling();
+ }
+
+ cleanExpiredBuckets();
+ // @@@ Testing
+ // LOG.info("Raw Data:" + buckets + ",unflushed:" + unflushed);
+
+ Collection<V> values = buckets.values();
+
+ V ret = merger.merge(values, unflushed, this);
+ if (ret == null) {
+
+ // @@@ testing
+ // LOG.warn("!!!!Exist null data !!!!!");
+ return defaultValue;
+ }
+ return ret;
+ }
+
+ /*
+ * Move the "current bucket time" index and clean the expired buckets
+ */
+ protected void rolling() {
+ synchronized (this) {
+ if (unflushed != null) {
+ buckets.put(currBucketTime, unflushed);
+ unflushed = null;
+ }
+
+ currBucketTime = getCurrBucketTime();
+
+ return;
+ }
+ }
+
+ protected void cleanExpiredBuckets() {
+ int nowSec = TimeUtils.current_time_secs();
+ int startRemove = nowSec - (interval - 1) - windowSecond;
+
+ List<Integer> removeList = new ArrayList<Integer>();
+
+ for (Integer keyTime : buckets.keySet()) {
+ if (keyTime < startRemove) {
+ removeList.add(keyTime);
+ } else if (keyTime >= startRemove) {
+ break;
+ }
+ }
+
+ for (Integer removeKey : removeList) {
+ buckets.remove(removeKey);
+ // @@@ Testing
+ // LOG.info("Remove key:" + removeKey + ", diff:" + (nowSec - removeKey));
+
+ }
+
+ if (buckets.isEmpty() == false) {
+ Integer first = buckets.firstKey();
+ startTime = first.longValue() * 1000;
+ }
+ }
+
+ public int getWindowSecond() {
+ return windowSecond;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public int getInterval() {
+ return interval;
+ }
+
+ public Integer getBucketNum() {
+ return bucketNum;
+ }
+
+ public V getDefaultValue() {
+ return defaultValue;
+ }
+
+ private Integer getCurrBucketTime() {
+ return (TimeUtils.current_time_secs() / interval) * interval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java
new file mode 100644
index 0000000..30f5c64
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.google.common.base.Joiner;
+
+import java.util.*;
+
+public class StatBuckets {
+
+ public static final Integer NUM_STAT_BUCKETS = 20;
+
+ public static final Integer MINUTE_WINDOW = 600;
+ public static final Integer HOUR_WINDOW = 10800;
+ public static final Integer DAY_WINDOW = 86400;
+ public static final Integer ALL_TIME_WINDOW = 0;
+ public static Set<Integer> TIME_WINDOWS = new TreeSet<Integer>();
+ static {
+ TIME_WINDOWS.add(ALL_TIME_WINDOW);
+ TIME_WINDOWS.add(MINUTE_WINDOW);
+ TIME_WINDOWS.add(HOUR_WINDOW);
+ TIME_WINDOWS.add(DAY_WINDOW);
+ }
+
+ public static final String MINUTE_WINDOW_STR = "0d0h10m0s";
+ public static final String HOUR_WINDOW_STR = "0d3h0m0s";
+ public static final String DAY_WINDOW_STR = "1d0h0m0s";
+ public static final String ALL_WINDOW_STR = "All-time";
+
+ public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS, HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS };
+
+ private static final String[][] PRETTYSECDIVIDERS = { new String[] { "s", "60" }, new String[] { "m", "60" }, new String[] { "h", "24" },
+ new String[] { "d", null } };
+
+ /**
+ * Service b
+ *
+ * @param key
+ * @return
+ */
+ public static String parseTimeKey(Integer key) {
+ if (key == 0) {
+ return ALL_WINDOW_STR;
+ } else {
+ return String.valueOf(key);
+ }
+ }
+
+ /**
+ *
+ * Default is the latest result
+ *
+ * @param showKey
+ * @return
+ */
+ public static Integer getTimeKey(String showKey) {
+ Integer window = null;
+ if (showKey == null) {
+ window = (MINUTE_WINDOW);
+ } else if (showKey.equals(MINUTE_WINDOW_STR)) {
+ window = (MINUTE_WINDOW);
+ } else if (showKey.equals(HOUR_WINDOW_STR)) {
+ window = (HOUR_WINDOW);
+ } else if (showKey.equals(DAY_WINDOW_STR)) {
+ window = (DAY_WINDOW);
+ } else if (showKey.equals(ALL_WINDOW_STR)) {
+ window = ALL_TIME_WINDOW;
+ } else {
+ window = MINUTE_WINDOW;
+ }
+
+ return window;
+ }
+
+ /**
+ * Default is the latest result
+ *
+ * @param showStr
+ * @return
+ */
+ public static String getShowTimeStr(Integer time) {
+ if (time == null) {
+ return MINUTE_WINDOW_STR;
+ } else if (time.equals(MINUTE_WINDOW)) {
+ return MINUTE_WINDOW_STR;
+ } else if (time.equals(HOUR_WINDOW)) {
+ return HOUR_WINDOW_STR;
+ } else if (time.equals(DAY_WINDOW)) {
+ return DAY_WINDOW_STR;
+ } else if (time.equals(ALL_TIME_WINDOW)) {
+ return ALL_WINDOW_STR;
+ } else {
+ return MINUTE_WINDOW_STR;
+ }
+
+ }
+
+ /**
+ * seconds to string like 1d20h30m40s
+ *
+ * @param secs
+ * @return
+ */
+ public static String prettyUptimeStr(int secs) {
+ int diversize = PRETTYSECDIVIDERS.length;
+
+ List<String> tmp = new ArrayList<String>();
+ int div = secs;
+ for (int i = 0; i < diversize; i++) {
+ if (PRETTYSECDIVIDERS[i][1] != null) {
+ Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
+ tmp.add(div % d + PRETTYSECDIVIDERS[i][0]);
+ div = div / d;
+ } else {
+ tmp.add(div + PRETTYSECDIVIDERS[i][0]);
+ }
+ }
+
+ String rtn = "";
+ int tmpSzie = tmp.size();
+ for (int j = tmpSzie - 1; j > -1; j--) {
+ rtn += tmp.get(j);
+ }
+ return rtn;
+ }
+
+ /**
+ * seconds to string like '30m 40s' and '1d 20h 30m 40s'
+ *
+ * @param secs
+ * @return
+ */
+ public static String prettyUptime(int secs) {
+ int diversize = PRETTYSECDIVIDERS.length;
+
+ LinkedList<String> tmp = new LinkedList<>();
+ int div = secs;
+ for (int i = 0; i < diversize; i++) {
+ if (PRETTYSECDIVIDERS[i][1] != null) {
+ Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
+ tmp.addFirst(div % d + PRETTYSECDIVIDERS[i][0]);
+ div = div / d;
+ } else {
+ tmp.addFirst(div + PRETTYSECDIVIDERS[i][0]);
+ }
+ if (div <= 0 ) break;
+ }
+
+ Joiner joiner = Joiner.on(" ");
+ return joiner.join(tmp);
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java
deleted file mode 100755
index 3d32cc9..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator;
-
-import java.io.Serializable;
-
-public interface Sampling<V> extends Serializable {
-
- /**
- * Update object into Metric
- *
- * @param obj
- */
- void update(Number obj);
-
- /**
- *
- * Get snapshot of Metric
- *
- * @return
- */
- V getSnapshot();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java
deleted file mode 100755
index 0b6173f..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator;
-
-public interface StartTime {
- long getStartTime();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java
deleted file mode 100755
index 8f142f1..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AtomicLongToLong implements Convertor<AtomicLong, Long> {
- private static final long serialVersionUID = -2755066621494409063L;
-
- @Override
- public Long convert(AtomicLong obj) {
- // TODO Auto-generated method stub
- if (obj == null) {
- return null;
- } else {
- return obj.get();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java
deleted file mode 100755
index 73cdceb..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-import java.io.Serializable;
-
-public interface Convertor<From, To> extends Serializable {
-
- To convert(From obj);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java
deleted file mode 100755
index 47065d0..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-public class DefaultConvertor<T> implements Convertor<T, T> {
- private static final long serialVersionUID = -647209923903679727L;
-
- @Override
- public T convert(T obj) {
- // TODO Auto-generated method stub
- return obj;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java
deleted file mode 100755
index 4891222..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-public class SetToList<T> implements Convertor<Set<T>, List<T>> {
- private static final long serialVersionUID = 4968816655779625255L;
-
- @Override
- public List<T> convert(Set<T> set) {
- // TODO Auto-generated method stub
- List<T> ret = new ArrayList<T>();
- if (set != null) {
- for (T item : set) {
- ret.add(item);
- }
- }
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java
deleted file mode 100755
index 3ad94f2..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.utils.Pair;
-
-public class AvgMerger implements Merger<Histogram.HistorgramPair> {
- private static final long serialVersionUID = -3892281208959055221L;
-
- @Override
- public Histogram.HistorgramPair merge(
- Collection<Histogram.HistorgramPair> objs,
- Histogram.HistorgramPair unflushed, Object... others) {
- // TODO Auto-generated method stub
- double sum = 0.0d;
- long times = 0l;
-
- if (unflushed != null) {
- sum = sum + unflushed.getSum();
- times = times + unflushed.getTimes();
- }
-
- for (Histogram.HistorgramPair item : objs) {
- if (item == null) {
- continue;
- }
- sum = sum + item.getSum();
- times = times + item.getTimes();
- }
-
- return new Histogram.HistorgramPair(sum, times);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak
deleted file mode 100755
index 6f82888..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class AvgMerger2 implements Merger<Pair<AtomicDouble, AtomicLong>> {
- private static final long serialVersionUID = -3892281208959055221L;
-
- @Override
- public Pair<AtomicDouble, AtomicLong> merge(
- Collection<Pair<AtomicDouble, AtomicLong>> objs,
- Pair<AtomicDouble, AtomicLong> unflushed, Object... others) {
- // TODO Auto-generated method stub
- AtomicDouble sum = new AtomicDouble(0.0);
- AtomicLong times = new AtomicLong(0);
-
- if (unflushed != null) {
- sum.addAndGet(unflushed.getFirst().get());
- times.addAndGet(unflushed.getSecond().get());
- }
-
- for (Pair<AtomicDouble, AtomicLong> item : objs) {
- if (item == null) {
- continue;
- }
- sum.addAndGet(item.getFirst().get());
- times.addAndGet(item.getSecond().get());
- }
-
- return new Pair<AtomicDouble, AtomicLong>(sum, times);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java
deleted file mode 100755
index 30ded34..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class LongSumMerger implements Merger<AtomicLong> {
- private static final long serialVersionUID = -3500779273677666691L;
-
- @Override
- public AtomicLong merge(Collection<AtomicLong> objs, AtomicLong unflushed,
- Object... others) {
- AtomicLong ret = new AtomicLong(0);
- if (unflushed != null) {
- ret.addAndGet(unflushed.get());
- }
-
- for (AtomicLong item : objs) {
- if (item == null) {
- continue;
- }
- ret.addAndGet(item.get());
- }
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java
deleted file mode 100755
index 0483458..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-public interface Merger<V> extends Serializable {
- V merge(Collection<V> objs, V unflushed, Object... others);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java
deleted file mode 100755
index ead3c53..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class SumMerger<T extends Number> implements Merger<T> {
- private static final long serialVersionUID = -7026523452570138433L;
-
- @SuppressWarnings("unchecked")
- @Override
- public T merge(Collection<T> objs, T unflushed, Object... others) {
- // TODO Auto-generated method stub
- T ret = unflushed;
- for (T obj : objs) {
- ret = (T) JStormUtils.add(ret, obj);
- }
-
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java
deleted file mode 100755
index 859f642..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-
-import com.alibaba.jstorm.common.metric.operator.StartTime;
-
-public class TpsMerger implements Merger<Double> {
- private static final long serialVersionUID = -4534840881635955942L;
- protected final long createTime;
-
- public TpsMerger() {
- createTime = System.currentTimeMillis();
- }
-
- public long getRunMillis(Object... args) {
- long startTime = createTime;
-
- if (args != null) {
- if (args[0] != null && args[0] instanceof StartTime) {
- StartTime rollingWindow = (StartTime) args[0];
-
- startTime = rollingWindow.getStartTime();
- }
- }
-
- return (System.currentTimeMillis() - startTime);
- }
-
- @Override
- public Double merge(Collection<Double> objs, Double unflushed,
- Object... others) {
- // TODO Auto-generated method stub
- double sum = 0.0d;
- if (unflushed != null) {
- sum += unflushed;
- }
-
- for (Double item : objs) {
- if (item != null) {
- sum += item;
- }
- }
-
- Double ret = (sum * 1000) / getRunMillis(others);
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java
deleted file mode 100755
index 4fdf813..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class AddUpdater<T extends Number> implements Updater<T> {
- private static final long serialVersionUID = -7955740095421752763L;
-
- @SuppressWarnings("unchecked")
- @Override
- public T update(Number object, T cache, Object... others) {
- // TODO Auto-generated method stub
- return (T) JStormUtils.add(cache, object);
- }
-
- @Override
- public T updateBatch(T object, T cache, Object... objects) {
- // TODO Auto-generated method stub
- return (T) JStormUtils.add(cache, object);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java
deleted file mode 100755
index 30ae46c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import com.alibaba.jstorm.common.metric.Histogram;
-
-public class AvgUpdater implements Updater<Histogram.HistorgramPair> {
- private static final long serialVersionUID = 2562836921724586449L;
-
- @Override
- public Histogram.HistorgramPair update(Number object,
- Histogram.HistorgramPair cache, Object... others) {
- // TODO Auto-generated method stub
- if (object == null) {
- return cache;
- }
- if (cache == null) {
- cache =
- new Histogram.HistorgramPair();
- }
-
- cache.addValue(object.doubleValue());
- cache.addTimes(1l);
-
- return cache;
- }
-
- @Override
- public Histogram.HistorgramPair updateBatch(
- Histogram.HistorgramPair object,
- Histogram.HistorgramPair cache, Object... objects) {
- // TODO Auto-generated method stub
- if (object == null) {
- return cache;
- }
- if (cache == null) {
- cache =
- new Histogram.HistorgramPair();
- }
-
- cache.addValue(object.getSum());
- cache.addTimes(object.getTimes());
-
- return cache;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak
deleted file mode 100755
index 44cc70d..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class AvgUpdater2 implements Updater<Pair<AtomicDouble, AtomicLong>> {
- private static final long serialVersionUID = 2562836921724586449L;
-
- @Override
- public Pair<AtomicDouble, AtomicLong> update(Number object,
- Pair<AtomicDouble, AtomicLong> cache, Object... others) {
- // TODO Auto-generated method stub
- if (object == null) {
- return cache;
- }
- if (cache == null) {
- cache =
- new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0),
- new AtomicLong(0));
- }
-
- AtomicDouble sum = cache.getFirst();
- AtomicLong times = cache.getSecond();
-
- sum.addAndGet(object.doubleValue());
- times.incrementAndGet();
-
- return cache;
- }
-
- @Override
- public Pair<AtomicDouble, AtomicLong> updateBatch(
- Pair<AtomicDouble, AtomicLong> object,
- Pair<AtomicDouble, AtomicLong> cache, Object... objects) {
- // TODO Auto-generated method stub
- if (object == null) {
- return cache;
- }
- if (cache == null) {
- cache =
- new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0),
- new AtomicLong(0));
- }
-
- AtomicDouble sum = cache.getFirst();
- AtomicLong times = cache.getSecond();
-
- sum.addAndGet(object.getFirst().get());
- times.addAndGet(object.getSecond().get());
-
- return cache;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java
deleted file mode 100755
index e3b640a..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class DoubleAddUpdater implements Updater<AtomicDouble> {
- private static final long serialVersionUID = -1293565961076552462L;
-
- @Override
- public AtomicDouble update(Number object, AtomicDouble cache,
- Object... others) {
- // TODO Auto-generated method stub
- if (cache == null) {
- cache = new AtomicDouble(0.0);
- }
- if (object != null) {
- cache.addAndGet(object.doubleValue());
- }
- return cache;
- }
-
- @Override
- public AtomicDouble updateBatch(AtomicDouble object, AtomicDouble cache,
- Object... objects) {
- // TODO Auto-generated method stub
- return update(object, cache, objects);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java
deleted file mode 100755
index 4986146..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class LongAddUpdater implements Updater<AtomicLong> {
- private static final long serialVersionUID = -2185639264737912405L;
-
- @Override
- public AtomicLong update(Number object, AtomicLong cache, Object... others) {
- // TODO Auto-generated method stub
- if (cache == null) {
- cache = new AtomicLong(0);
- }
-
- if (object != null) {
- cache.addAndGet(object.longValue());
- }
- return cache;
- }
-
- @Override
- public AtomicLong updateBatch(AtomicLong object, AtomicLong cache,
- Object... objects) {
- // TODO Auto-generated method stub
- return update(object, cache, objects);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java
deleted file mode 100755
index cb22c4c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import java.io.Serializable;
-
-public interface Updater<V> extends Serializable {
- V update(Number object, V cache, Object... others);
- V updateBatch(V object, V cache, Object... objects );
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java
new file mode 100644
index 0000000..2f79141
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java
@@ -0,0 +1,20 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmCounterSnapshot extends AsmSnapshot {
+ private static final long serialVersionUID = -7574994037947802582L;
+
+ private long v;
+
+ public long getV() {
+ return v;
+ }
+
+ public AsmSnapshot setValue(long value) {
+ this.v = value;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java
new file mode 100644
index 0000000..221b5b1
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java
@@ -0,0 +1,20 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmGaugeSnapshot extends AsmSnapshot {
+ private static final long serialVersionUID = 3216517772824794848L;
+
+ private double v;
+
+ public double getV() {
+ return v;
+ }
+
+ public AsmSnapshot setValue(double value) {
+ this.v = value;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java
new file mode 100644
index 0000000..51ac3f5
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java
@@ -0,0 +1,22 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+import com.codahale.metrics.Snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmHistogramSnapshot extends AsmSnapshot {
+ private static final long serialVersionUID = 7284437562594156565L;
+
+ private Snapshot snapshot;
+
+ public Snapshot getSnapshot() {
+ return snapshot;
+ }
+
+ public AsmSnapshot setSnapshot(Snapshot snapshot) {
+ this.snapshot = snapshot;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java
new file mode 100644
index 0000000..e255e6b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java
@@ -0,0 +1,50 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmMeterSnapshot extends AsmSnapshot {
+ private static final long serialVersionUID = -1754325312045025810L;
+
+ private double m1;
+ private double m5;
+ private double m15;
+ private double mean;
+
+ public double getM1() {
+ return m1;
+ }
+
+ public AsmMeterSnapshot setM1(double m1) {
+ this.m1 = m1;
+ return this;
+ }
+
+ public double getM5() {
+ return m5;
+ }
+
+ public AsmMeterSnapshot setM5(double m5) {
+ this.m5 = m5;
+ return this;
+ }
+
+ public double getM15() {
+ return m15;
+ }
+
+ public AsmMeterSnapshot setM15(double m15) {
+ this.m15 = m15;
+ return this;
+ }
+
+ public double getMean() {
+ return mean;
+ }
+
+ public AsmMeterSnapshot setMean(double mean) {
+ this.mean = mean;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java
new file mode 100644
index 0000000..4c71fe9
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java
@@ -0,0 +1,32 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+import java.io.Serializable;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public abstract class AsmSnapshot implements Serializable {
+ private static final long serialVersionUID = 1945719653840917619L;
+
+ private long metricId;
+ private long ts;
+
+ public long getTs() {
+ return ts;
+ }
+
+ public AsmSnapshot setTs(long ts) {
+ this.ts = ts;
+ return this;
+ }
+
+ public long getMetricId() {
+ return metricId;
+ }
+
+ public AsmSnapshot setMetricId(long metricId) {
+ this.metricId = metricId;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java
new file mode 100644
index 0000000..6fec50c
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java
@@ -0,0 +1,32 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+import com.codahale.metrics.Snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmTimerSnapshot extends AsmSnapshot {
+ private static final long serialVersionUID = 7784062881728741781L;
+
+ private Snapshot histogram;
+ private AsmMeterSnapshot meter;
+
+ public Snapshot getHistogram() {
+ return histogram;
+ }
+
+ public AsmTimerSnapshot setHistogram(Snapshot snapshot) {
+ this.histogram = snapshot;
+ return this;
+ }
+
+ public AsmMeterSnapshot getMeter() {
+ return meter;
+ }
+
+ public AsmTimerSnapshot setMeter(AsmMeterSnapshot meter) {
+ this.meter = meter;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java
deleted file mode 100755
index 8475e4c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-
-import com.alibaba.jstorm.common.metric.operator.Sampling;
-import com.alibaba.jstorm.common.metric.operator.StartTime;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-
-public class AllWindow<V> implements Sampling<V>, StartTime {
-
- private static final long serialVersionUID = -8523514907315740812L;
-
- protected V unflushed;
- protected V defaultValue;
-
- protected Updater<V> updater;
- protected Merger<V> merger;
- protected long startTime;
-
- AllWindow(V defaultValue, Updater<V> updater, Merger<V> merger) {
-
- this.updater = updater;
- this.merger = merger;
-
- this.defaultValue = defaultValue;
- this.startTime = System.currentTimeMillis();
- }
-
- @Override
- public void update(Number obj) {
- // TODO Auto-generated method stub
- synchronized (this) {
- unflushed = updater.update(obj, unflushed);
- }
- }
-
- public void updateBatch(V batch) {
- synchronized (this) {
- unflushed = updater.updateBatch(batch, unflushed);
- }
- }
-
- @Override
- public V getSnapshot() {
- // TODO Auto-generated method stub
- V ret = merger.merge(new ArrayList<V>(), unflushed, this);
- if (ret == null) {
- return defaultValue;
- } else {
- return ret;
- }
- }
-
- @Override
- public long getStartTime() {
- // TODO Auto-generated method stub
- return startTime;
- }
-
-}