You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/14 04:33:48 UTC
[1/2] TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
Updated Branches:
refs/heads/master 1d0d458bf -> 62c49c05f
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
new file mode 100644
index 0000000..c9d25c5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.GroupNameMetricsFilter;
+import org.apache.tajo.util.metrics.MetricsFilterList;
+import org.apache.tajo.util.metrics.RegexpMetricsFilter;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter implements Closeable {
+ private static final Log LOG = LogFactory.getLog(TajoMetricsScheduledReporter.class);
+
+ public static final String PERIOD_KEY = "period";
+
+ protected MetricRegistry registry;
+ protected ScheduledExecutorService executor;
+ protected MetricFilter filter;
+ protected double durationFactor;
+ protected String durationUnit;
+ protected double rateFactor;
+ protected String rateUnit;
+ protected Map<String, String> metricsProperties;
+ protected String metricsName;
+ protected String metricsPropertyKey;
+ protected String hostAndPort;
+ protected long period;
+
+ protected abstract String getReporterName();
+ protected abstract void afterInit();
+
+ private static class NamedThreadFactory implements ThreadFactory {
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ private NamedThreadFactory(String name) {
+ final SecurityManager s = System.getSecurityManager();
+ this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.namePrefix = "metrics-" + name + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ t.setDaemon(true);
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+ }
+
+ public long getPeriod() {
+ return period;
+ }
+
+ public void init(MetricRegistry registry,
+ String metricsName,
+ String hostAndPort,
+ Map<String, String> metricsProperties) {
+ this.registry = registry;
+ this.metricsName = metricsName;
+ this.executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(metricsName));
+ this.rateFactor = TimeUnit.SECONDS.toSeconds(1);
+ this.rateUnit = calculateRateUnit(TimeUnit.MILLISECONDS);
+ this.durationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
+ this.durationUnit = TimeUnit.MILLISECONDS.toString().toLowerCase(Locale.US);
+ this.metricsProperties = metricsProperties;
+ this.metricsPropertyKey = metricsName + "." + getReporterName() + ".";
+ this.hostAndPort = hostAndPort;
+
+ MetricsFilterList filterList = new MetricsFilterList();
+ filterList.addMetricFilter(new GroupNameMetricsFilter(metricsName));
+
+ String regexpFilterKey = metricsPropertyKey + "regexp.";
+ Set<String> regexpExpressions = new HashSet<String>();
+
+ for(Map.Entry<String, String> entry: metricsProperties.entrySet()) {
+ String key = entry.getKey();
+ if(key.indexOf(regexpFilterKey) == 0) {
+ regexpExpressions.add(entry.getValue());
+ }
+ }
+
+ if(!regexpExpressions.isEmpty()) {
+ filterList.addMetricFilter(new RegexpMetricsFilter(regexpExpressions));
+ }
+ this.filter = filterList;
+
+ this.period = 60;
+ if(metricsProperties != null && metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
+ this.period = Integer.parseInt(metricsProperties.get(metricsPropertyKey + PERIOD_KEY));
+ }
+ afterInit();
+ }
+
+ public void start() {
+ start(period, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Starts the reporter polling at the given period.
+ *
+ * @param period the amount of time between polls
+ * @param unit the unit for {@code period}
+ */
+ public void start(long period, TimeUnit unit) {
+ this.period = period;
+ executor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ report();
+ } catch (Exception e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.warn("Metric report error:" + e.getMessage(), e);
+ } else {
+ LOG.warn("Metric report error:" + e.getMessage(), e);
+ }
+ }
+ }
+ }, period, period, unit);
+ }
+
+ /**
+ * Stops the reporter and shuts down its thread of execution.
+ */
+ public void stop() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ // do nothing
+ }
+ }
+
+ /**
+ * Stops the reporter and shuts down its thread of execution.
+ */
+ @Override
+ public void close() {
+ stop();
+ }
+
+ /**
+ * Report the current values of all metrics in the registry.
+ */
+ public void report() {
+ report(registry.getGauges(filter),
+ registry.getCounters(filter),
+ registry.getHistograms(filter),
+ registry.getMeters(filter),
+ registry.getTimers(filter));
+ }
+
+ protected String getRateUnit() {
+ return rateUnit;
+ }
+
+ protected String getDurationUnit() {
+ return durationUnit;
+ }
+
+ protected double convertDuration(double duration) {
+ return duration * durationFactor;
+ }
+
+ protected double convertRate(double rate) {
+ return rate * rateFactor;
+ }
+
+ private String calculateRateUnit(TimeUnit unit) {
+ final String s = unit.toString().toLowerCase(Locale.US);
+ return s.substring(0, s.length() - 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 1f3445a..986f453 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,6 +18,7 @@
package org.apache.tajo.worker;
+import com.codahale.metrics.Gauge;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,6 +51,7 @@ import org.apache.tajo.storage.v2.DiskUtil;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.webapp.StaticHttpServer;
import java.io.*;
@@ -121,6 +123,8 @@ public class TajoWorker extends CompositeService {
private DeletionService deletionService;
+ private TajoSystemMetrics workerSystemMetrics;
+
public TajoWorker() throws Exception {
super(TajoWorker.class.getName());
}
@@ -261,6 +265,33 @@ public class TajoWorker extends CompositeService {
}
+ private void initWorkerMetrics() {
+ workerSystemMetrics = new TajoSystemMetrics(systemConf, "worker", workerContext.getWorkerName());
+ workerSystemMetrics.start();
+
+ workerSystemMetrics.register("querymaster", "runningQueries", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if(queryMasterManagerService != null) {
+ return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size();
+ } else {
+ return 0;
+ }
+ }
+ });
+
+ workerSystemMetrics.register("task", "runningTasks", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if(taskRunnerManager != null) {
+ return taskRunnerManager.getNumTasks();
+ } else {
+ return 0;
+ }
+ }
+ });
+ }
+
public WorkerContext getWorkerContext() {
return workerContext;
}
@@ -268,6 +299,7 @@ public class TajoWorker extends CompositeService {
@Override
public void start() {
super.start();
+ initWorkerMetrics();
}
@Override
@@ -302,6 +334,10 @@ public class TajoWorker extends CompositeService {
}
}
+ if(workerSystemMetrics != null) {
+ workerSystemMetrics.stop();
+ }
+
if(deletionService != null) deletionService.stop();
super.stop();
LOG.info("TajoWorker main thread exiting");
@@ -432,6 +468,10 @@ public class TajoWorker extends CompositeService {
public boolean isTaskRunnerMode() {
return taskRunnerMode;
}
+
+ public TajoSystemMetrics getWorkerSystemMetrics() {
+ return workerSystemMetrics;
+ }
}
public void stopWorkerForce() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index d10b53d..bd2dc4d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -111,6 +111,7 @@ public class TajoWorkerManagerService extends CompositeService
public void executeExecutionBlock(RpcController controller,
TajoWorkerProtocol.RunExecutionBlockRequestProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
+ workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
try {
String[] params = new String[7];
params[0] = "standby"; //mode(never used)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 1920f25..18c312f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -352,7 +352,7 @@ public class TaskRunner extends AbstractService {
taskRunnerManager.stopTask(getId());
}
} else {
-
+ taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
LOG.info("Accumulated Received Task: " + (++receivedNum));
QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 1ea213d..6523a4f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -43,6 +43,10 @@ public class TaskRunnerManager extends CompositeService {
this.workerContext = workerContext;
}
+ public TajoWorker.WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
@Override
public void init(Configuration conf) {
tajoConf = (TajoConf)conf;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties b/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties
new file mode 100644
index 0000000..4ae6a6c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-metrics.properties
@@ -0,0 +1,75 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###############################################################################
+# report definition
+# syntax: reporter.<name>=<class>
+reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter
+reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter
+reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter
+reporter.ganglia=org.apache.tajo.util.metrics.reporter.GangliaReporter
+###############################################################################
+
+###############################################################################
+# syntax: <metrics group name>.reporters=<reporter name1>[,<reporter name2>,...]
+# syntax: <metrics group name>.<reporter name>.<options>=<value>
+###############################################################################
+
+###############################################################################
+# tajo master
+###############################################################################
+tajomaster.reporters=null
+
+#tajomaster.reporters=file,console
+#tajomaster.console.period=60
+#tajomaster.file.filename=/tmp/tajo/tajomaster-metrics.out
+#tajomaster.file.period=60
+#tajomaster.ganglia.server=my.ganglia.com
+#tajomaster.ganglia.port=8649
+#tajomaster.ganglia.period=60
+###############################################################################
+
+###############################################################################
+# tajo master-jvm
+###############################################################################
+tajomaster-jvm.reporters=null
+#tajomaster-jvm.reporters=console
+#tajomaster-jvm.console.period=60
+#tajomaster-jvm.file.filename=/tmp/tajo/tajomaster-jvm-metrics.out
+#tajomaster-jvm.file.period=60
+###############################################################################
+
+###############################################################################
+# worker
+###############################################################################
+worker.reporters=null
+#worker.reporters=file,console
+#worker.console.period=60
+#worker.file.filename=/tmp/tajo/worker-metrics.out
+#worker.file.period=60
+###############################################################################
+
+###############################################################################
+# worker-jvm
+###############################################################################
+worker-jvm.reporters=null
+#worker-jvm.reporters=console
+#worker-jvm.console.period=60
+#worker-jvm.file.filename=/tmp/tajo/worker-jvm-metrics.out
+#worker-jvm.file.period=60
+###############################################################################
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
new file mode 100644
index 0000000..b70512c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestMetricsFilter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestMetricsFilter {
+ @Test
+ public void testGroupNameMetricsFilter() {
+ GroupNameMetricsFilter filter = new GroupNameMetricsFilter("tajomaster");
+
+ assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+ assertTrue(!filter.matches("tajomaster01.JVM.Heap.memFree", null));
+ assertTrue(!filter.matches("server.tajomaster.JVM.Heap.memFree", null));
+ assertTrue(!filter.matches("tajworker.JVM.Heap.memFree", null));
+ }
+
+ @Test
+ public void testRegexpMetricsFilter() {
+ List<String> filterExpressions = new ArrayList<String>();
+ filterExpressions.add("JVM");
+ filterExpressions.add("Query");
+
+ RegexpMetricsFilter filter = new RegexpMetricsFilter(filterExpressions);
+
+ assertTrue(filter.matches("tajomaster.JVM.Heap.memFree", null));
+ assertTrue(filter.matches("tajomaster.Query.numQuery", null));
+
+ assertTrue(!filter.matches("tajomaster.resource.numWorker", null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
new file mode 100644
index 0000000..a4af64c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Counter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestSystemMetrics {
+ Path testPropertyFile;
+ Path metricsOutputFile;
+ @Before
+ public void setUp() throws Exception {
+ testPropertyFile =
+ new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".properties");
+
+ metricsOutputFile =
+ new Path(CommonTestingUtil.getTestDir(), System.currentTimeMillis() + ".out");
+
+ FileOutputStream out = new FileOutputStream(testPropertyFile.toUri().getPath());
+ out.write("reporter.null=org.apache.tajo.util.metrics.reporter.NullReporter\n".getBytes());
+ out.write("reporter.file=org.apache.tajo.util.metrics.reporter.MetricsFileScheduledReporter\n".getBytes());
+ out.write("reporter.console=org.apache.tajo.util.metrics.reporter.MetricsConsoleScheduledReporter\n".getBytes());
+
+ out.write("test-file-group.reporters=file\n".getBytes());
+ out.write("test-console-group.reporters=console\n".getBytes());
+ out.write("test-find-console-group.reporters=console,file\n".getBytes());
+
+ out.write(("test-file-group.file.filename=" + metricsOutputFile.toUri().getPath() + "\n").getBytes());
+ out.write("test-file-group.file.period=5\n".getBytes());
+ }
+
+ @Test
+ public void testMetricsReporter() throws Exception {
+ TajoConf tajoConf = new TajoConf();
+ tajoConf.set("tajo.metrics.property.file", testPropertyFile.toUri().getPath());
+ TajoSystemMetrics tajoSystemMetrics = new TajoSystemMetrics(tajoConf, "test-file-group", "localhost");
+ tajoSystemMetrics.start();
+
+ Collection<TajoMetricsScheduledReporter> reporters = tajoSystemMetrics.getMetricsReporters();
+
+ assertEquals(1, reporters.size());
+
+ TajoMetricsScheduledReporter reporter = reporters.iterator().next();
+ assertEquals(5, reporter.getPeriod());
+
+ for(int i = 0; i < 10; i++) {
+ tajoSystemMetrics.counter("test-group01", "test-item1").inc();
+ tajoSystemMetrics.counter("test-group01", "test-item2").inc(2);
+ tajoSystemMetrics.counter("test-group02", "test-item1").inc(3);
+ }
+
+ SortedMap<String, Counter> counterMap = tajoSystemMetrics.getRegistry().getCounters();
+ Counter counter1 = counterMap.get("test-file-group.test-group01.test-item1");
+ assertNotNull(counter1);
+ assertEquals(10, counter1.getCount());
+
+ Counter counter2 = counterMap.get("test-file-group.test-group01.test-item2");
+ assertNotNull(counter2);
+ assertEquals(20, counter2.getCount());
+
+ //test findMetricsItemGroup method
+ Map<String, Map<String, Counter>> groupItems = reporter.findMetricsItemGroup(counterMap);
+ assertEquals(2, groupItems.size());
+
+ Map<String, Counter> group01Items = groupItems.get("test-file-group.test-group01");
+ assertEquals(2, group01Items.size());
+
+ counter1 = group01Items.get("test-item1");
+ assertNotNull(counter1);
+ assertEquals(10, counter1.getCount());
+
+ counter2 = group01Items.get("test-item2");
+ assertNotNull(counter2);
+ assertEquals(20, counter2.getCount());
+
+ Map<String, Counter> group02Items = groupItems.get("test-file-group.test-group02");
+ assertEquals(1, group02Items.size());
+
+ reporter.report();
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(metricsOutputFile.toUri().getPath())));
+
+ String line = null;
+
+ List<String> lines = new ArrayList<String>();
+ while((line = reader.readLine()) != null) {
+ lines.add(line);
+ }
+
+ assertEquals(2, lines.size());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileSystem fs = testPropertyFile.getFileSystem(new Configuration());
+ fs.delete(testPropertyFile, false);
+ fs.delete(metricsOutputFile, false);
+ }
+}
[2/2] git commit: TAJO-333: Add metric system to Tajo. (hyoungjunkim
via jihoon)
Posted by ji...@apache.org.
TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/62c49c05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/62c49c05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/62c49c05
Branch: refs/heads/master
Commit: 62c49c05f522158d75d818df49100a0b1fd354bb
Parents: 1d0d458
Author: Jihoon Son <ji...@apache.org>
Authored: Sat Dec 14 12:32:41 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Sat Dec 14 12:33:40 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/tajo/conf/TajoConf.java | 5 +-
tajo-core/tajo-core-backend/pom.xml | 16 ++
.../src/main/java/log4j.properties | 6 +-
.../org/apache/tajo/master/GlobalEngine.java | 7 +-
.../java/org/apache/tajo/master/TajoMaster.java | 26 +-
.../master/metrics/CatalogMetricsGaugeSet.java | 54 ++++
.../metrics/WorkerResourceMetricsGaugeSet.java | 74 ++++++
.../querymaster/QueryMasterManagerService.java | 3 +
.../master/querymaster/QueryMasterTask.java | 13 +
.../util/metrics/GroupNameMetricsFilter.java | 43 ++++
.../tajo/util/metrics/LogEventGaugeSet.java | 64 +++++
.../tajo/util/metrics/MetricsFilterList.java | 43 ++++
.../tajo/util/metrics/RegexpMetricsFilter.java | 51 ++++
.../tajo/util/metrics/TajoLogEventCounter.java | 86 +++++++
.../apache/tajo/util/metrics/TajoMetrics.java | 133 ++++++++++
.../tajo/util/metrics/TajoSystemMetrics.java | 213 +++++++++++++++
.../util/metrics/reporter/GangliaReporter.java | 258 +++++++++++++++++++
.../reporter/MetricsConsoleReporter.java | 80 ++++++
.../MetricsConsoleScheduledReporter.java | 32 +++
.../reporter/MetricsFileScheduledReporter.java | 57 ++++
.../MetricsStreamScheduledReporter.java | 179 +++++++++++++
.../util/metrics/reporter/NullReporter.java | 31 +++
.../metrics/reporter/TajoMetricsReporter.java | 232 +++++++++++++++++
.../reporter/TajoMetricsScheduledReporter.java | 206 +++++++++++++++
.../java/org/apache/tajo/worker/TajoWorker.java | 40 +++
.../tajo/worker/TajoWorkerManagerService.java | 1 +
.../java/org/apache/tajo/worker/TaskRunner.java | 2 +-
.../apache/tajo/worker/TaskRunnerManager.java | 4 +
.../src/main/resources/tajo-metrics.properties | 75 ++++++
.../tajo/util/metrics/TestMetricsFilter.java | 52 ++++
.../tajo/util/metrics/TestSystemMetrics.java | 133 ++++++++++
32 files changed, 2215 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 452cace..2f7d1db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.8.0 - unreleased
NEW FEATURES
+ TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
+
TAJO-413: Implement pi function. (DaeMyung Kang via jihoon)
TAJO-61: Implement Time Datum Type. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 0d9bbb0..fb1c29b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -195,8 +195,11 @@ public class TajoConf extends YarnConfiguration {
//////////////////////////////////
// Task Configuration
TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
- TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f)
+ TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f),
//////////////////////////////////
+
+ // Metrics
+ METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties")
;
public final String varname;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index fca3372..07117a6 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -35,6 +35,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<antlr4.visitor>true</antlr4.visitor>
<antlr4.listener>true</antlr4.listener>
+ <metrics.version>3.0.1</metrics.version>
</properties>
<repositories>
@@ -379,6 +380,21 @@
<version>6.1.14</version>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>${metrics.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>info.ganglia.gmetric4j</groupId>
+ <artifactId>gmetric4j</artifactId>
+ <version>1.0.3</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/log4j.properties b/tajo-core/tajo-core-backend/src/main/java/log4j.properties
index 749124c..15e5778 100644
--- a/tajo-core/tajo-core-backend/src/main/java/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/main/java/log4j.properties
@@ -18,11 +18,13 @@
# log4j configuration used during build and unit tests
-log4j.rootLogger=info,stdout
+log4j.rootLogger=info,stdout,EventCounter
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.logger.org.apache.hadoop=WARN
-log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file
+log4j.logger.org.apache.hadoop.conf=ERROR
+
+log4j.appender.EventCounter=org.apache.tajo.util.metrics.TajoLogEventCounter
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2412637..10f42c5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -92,7 +92,6 @@ public class GlobalEngine extends AbstractService {
hookManager = new DistributedQueryHookManager();
hookManager.addHook(new CreateTableHook());
hookManager.addHook(new InsertHook());
-
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
}
@@ -128,9 +127,12 @@ public class GlobalEngine extends AbstractService {
LOG.info("hive.query.mode:" + hiveQueryMode);
if (hiveQueryMode) {
+ context.getSystemMetrics().counter("Query", "numHiveMode").inc();
queryContext.setHiveQueryMode();
}
+ context.getSystemMetrics().counter("Query", "totalQuery").inc();
+
Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
LogicalPlan plan = createLogicalPlan(planningContext);
@@ -138,11 +140,13 @@ public class GlobalEngine extends AbstractService {
GetQueryStatusResponse.Builder responseBuilder = GetQueryStatusResponse.newBuilder();
if (PlannerUtil.checkIfDDLPlan(rootNode)) {
+ context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
updateQuery(rootNode.getChild());
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
+ context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
hookManager.doHooks(queryContext, plan);
QueryJobManager queryJobManager = this.context.getQueryJobManager();
@@ -169,6 +173,7 @@ public class GlobalEngine extends AbstractService {
return response;
} catch (Throwable t) {
+ context.getSystemMetrics().counter("Query", "errorQuery").inc();
LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t));
GetQueryStatusResponse.Builder responseBuilder = GetQueryStatusResponse.newBuilder();
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index f98cecb..e3d4c01 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -50,6 +50,8 @@ import org.apache.tajo.engine.function.datetime.ToCharTimestamp;
import org.apache.tajo.engine.function.datetime.ToTimestamp;
import org.apache.tajo.engine.function.math.*;
import org.apache.tajo.engine.function.string.*;
+import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.rm.WorkerResourceManager;
@@ -57,6 +59,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.webapp.QueryExecutorServlet;
import org.apache.tajo.webapp.StaticHttpServer;
@@ -70,6 +73,7 @@ import java.util.ArrayList;
import java.util.List;
public class TajoMaster extends CompositeService {
+ private static final String METRICS_GROUP_NAME = "tajomaster";
/** Class Logger */
private static final Log LOG = LogFactory.getLog(TajoMaster.class);
@@ -119,6 +123,8 @@ public class TajoMaster extends CompositeService {
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ private TajoSystemMetrics systemMetrics;
+
public TajoMaster() throws Exception {
super(TajoMaster.class.getName());
}
@@ -180,6 +186,14 @@ public class TajoMaster extends CompositeService {
LOG.info("Tajo Master is initialized.");
}
+ private void initSystemMetrics() {
+ systemMetrics = new TajoSystemMetrics(systemConf, METRICS_GROUP_NAME, getMasterName());
+ systemMetrics.start();
+
+ systemMetrics.register("resource", new WorkerResourceMetricsGaugeSet(context));
+ systemMetrics.register("catalog", new CatalogMetricsGaugeSet(context));
+ }
+
private void initResourceManager() throws Exception {
Class<WorkerResourceManager> resourceManagerClass = (Class<WorkerResourceManager>)
systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, TajoWorkerResourceManager.class);
@@ -826,8 +840,10 @@ public class TajoMaster extends CompositeService {
try {
writeSystemConf();
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
+
+ initSystemMetrics();
}
private void writeSystemConf() throws IOException {
@@ -862,6 +878,10 @@ public class TajoMaster extends CompositeService {
}
}
+ if(systemMetrics != null) {
+ systemMetrics.stop();
+ }
+
super.stop();
LOG.info("Tajo Master main thread exiting");
}
@@ -928,6 +948,10 @@ public class TajoMaster extends CompositeService {
public TajoMasterService getTajoMasterService() {
return tajoMasterService;
}
+
+ public TajoSystemMetrics getSystemMetrics() {
+ return systemMetrics;
+ }
}
String getThreadTaskName(long id, String name) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
new file mode 100644
index 0000000..08fff53
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CatalogMetricsGaugeSet implements MetricSet {
+ TajoMaster.MasterContext tajoMasterContext;
+ public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+ this.tajoMasterContext = tajoMasterContext;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+ metricsMap.put("numTables", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return tajoMasterContext.getCatalog().getAllTableNames().size();
+ }
+ });
+
+ metricsMap.put("numFunctions", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return tajoMasterContext.getCatalog().getFunctions().size();
+ }
+ });
+
+ return metricsMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
new file mode 100644
index 0000000..1924041
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.WorkerStatus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WorkerResourceMetricsGaugeSet implements MetricSet {
+ TajoMaster.MasterContext tajoMasterContext;
+ public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
+ this.tajoMasterContext = tajoMasterContext;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ Map<String, Metric> metricsMap = new HashMap<String, Metric>();
+ metricsMap.put("totalWorkers", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return tajoMasterContext.getResourceManager().getWorkers().size();
+ }
+ });
+
+ metricsMap.put("liveWorkers", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return getNumWorkers(WorkerStatus.LIVE);
+ }
+ });
+
+ metricsMap.put("deadWorkers", new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return getNumWorkers(WorkerStatus.DEAD);
+ }
+ });
+
+ return metricsMap;
+ }
+
+ protected int getNumWorkers(WorkerStatus status) {
+ int numWorkers = 0;
+ for(WorkerResource eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) {
+ if(eachWorker.getWorkerStatus() == status) {
+ numWorkers++;
+ }
+ }
+
+ return numWorkers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 4f0e128..78c417e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -194,6 +194,8 @@ public class QueryMasterManagerService extends CompositeService
TajoWorkerProtocol.QueryExecutionRequestProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
try {
+ workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
QueryId queryId = new QueryId(request.getQueryId());
LOG.info("Receive executeQuery request:" + queryId);
queryMaster.handle(new QueryStartEvent(queryId,
@@ -201,6 +203,7 @@ public class QueryMasterManagerService extends CompositeService
request.getLogicalPlanJson().getValue()));
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
+ workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
LOG.error(e.getMessage(), e);
done.run(TajoWorker.FALSE_PROTO);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 8cd7d45..828ebfa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -51,6 +51,8 @@ import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.util.metrics.TajoMetrics;
+import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import org.apache.tajo.worker.YarnResourceAllocator;
@@ -99,6 +101,8 @@ public class QueryMasterTask extends CompositeService {
private AtomicBoolean stopped = new AtomicBoolean(false);
+ private TajoMetrics queryMetrics;
+
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
super(QueryMasterTask.class.getName());
@@ -136,6 +140,8 @@ public class QueryMasterTask extends CompositeService {
initStagingDir();
+ queryMetrics = new TajoMetrics(queryId.toString());
+
super.init(systemConf);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
@@ -186,6 +192,9 @@ public class QueryMasterTask extends CompositeService {
super.stop();
+ //TODO change report to tajo master
+ queryMetrics.report(new MetricsConsoleReporter());
+
LOG.info("Stopped QueryMasterTask:" + queryId);
}
@@ -493,6 +502,10 @@ public class QueryMasterTask extends CompositeService {
public AbstractResourceAllocator getResourceAllocator() {
return resourceAllocator;
}
+
+ public TajoMetrics getQueryMetrics() {
+ return queryMetrics;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
new file mode 100644
index 0000000..a273475
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/GroupNameMetricsFilter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+public class GroupNameMetricsFilter implements MetricFilter {
+ String groupName;
+
+ public GroupNameMetricsFilter(String groupName) {
+ this.groupName = groupName;
+ }
+ @Override
+ public boolean matches(String name, Metric metric) {
+ if(name != null) {
+ String[] tokens = name.split("\\.");
+ if(groupName.equals(tokens[0])) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
new file mode 100644
index 0000000..6e130ff
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/LogEventGaugeSet.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LogEventGaugeSet implements MetricSet {
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ final Map<String, Metric> gauges = new HashMap<String, Metric>();
+
+ gauges.put("Fatal", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getFatal();
+ }
+ });
+
+ gauges.put("Error", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getError();
+ }
+ });
+
+ gauges.put("Warn", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getWarn();
+ }
+ });
+
+ gauges.put("Info", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return TajoLogEventCounter.getInfo();
+ }
+ });
+
+ return gauges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
new file mode 100644
index 0000000..b2fc6e4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/MetricsFilterList.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MetricsFilterList implements MetricFilter {
+ List<MetricFilter> filters = new ArrayList<MetricFilter>();
+
+ public void addMetricFilter(MetricFilter filter) {
+ filters.add(filter);
+ }
+
+ @Override
+ public boolean matches(String name, Metric metric) {
+ for (MetricFilter eachFilter: filters) {
+ if (!eachFilter.matches(name, metric)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
new file mode 100644
index 0000000..4faa3e7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/RegexpMetricsFilter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class RegexpMetricsFilter implements MetricFilter {
+ List<Pattern> filterPatterns = new ArrayList<Pattern>();
+
+ public RegexpMetricsFilter(Collection<String> filterExpressions) {
+ for(String eachExpression: filterExpressions) {
+ filterPatterns.add(Pattern.compile(eachExpression));
+ }
+ }
+
+ @Override
+ public boolean matches(String name, Metric metric) {
+ if(filterPatterns.isEmpty()) {
+ return true;
+ }
+
+ for(Pattern eachPattern: filterPatterns) {
+ if(eachPattern.matcher(name).find()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
new file mode 100644
index 0000000..3e44b02
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoLogEventCounter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TajoLogEventCounter extends AppenderSkeleton {
+ private static final int FATAL = 0;
+ private static final int ERROR = 1;
+ private static final int WARN = 2;
+ private static final int INFO = 3;
+
+ private static class EventCounts {
+
+ private final long[] counts = {0, 0, 0, 0};
+
+ private synchronized void incr(int i) {
+ ++counts[i];
+ }
+
+ private synchronized long get(int i) {
+ return counts[i];
+ }
+ }
+
+ private static EventCounts counts = new EventCounts();
+
+ public static long getFatal() {
+ return counts.get(FATAL);
+ }
+
+ public static long getError() {
+ return counts.get(ERROR);
+ }
+
+ public static long getWarn() {
+ return counts.get(WARN);
+ }
+
+ public static long getInfo() {
+ return counts.get(INFO);
+ }
+
+ @Override
+ public void append(LoggingEvent event) {
+ Level level = event.getLevel();
+ String levelStr = level.toString();
+
+ if (level == Level.INFO || "INFO".equalsIgnoreCase(levelStr)) {
+ counts.incr(INFO);
+ } else if (level == Level.WARN || "WARN".equalsIgnoreCase(levelStr)) {
+ counts.incr(WARN);
+ } else if (level == Level.ERROR || "ERROR".equalsIgnoreCase(levelStr)) {
+ counts.incr(ERROR);
+ } else if (level == Level.FATAL || "FATAL".equalsIgnoreCase(levelStr)) {
+ counts.incr(FATAL);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
new file mode 100644
index 0000000..0e378b2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoMetrics.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsReporter;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoMetrics {
+ private static final Log LOG = LogFactory.getLog(TajoMetrics.class);
+
+ protected MetricRegistry metricRegistry;
+ protected AtomicBoolean stop = new AtomicBoolean(false);
+ protected String metricsGroupName;
+
+ public TajoMetrics(String metricsGroupName) {
+ this.metricsGroupName = metricsGroupName;
+ this.metricRegistry = new MetricRegistry();
+ }
+
+ public void stop() {
+ stop.set(true);
+ }
+
+ public MetricRegistry getRegistry() {
+ return metricRegistry;
+ }
+
+ public void report(TajoMetricsReporter reporter) {
+ try {
+ reporter.report(metricRegistry.getGauges(),
+ metricRegistry.getCounters(),
+ metricRegistry.getHistograms(),
+ metricRegistry.getMeters(),
+ metricRegistry.getTimers());
+ } catch (Exception e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.warn("Metric report error:" + e.getMessage(), e);
+ } else {
+ LOG.warn("Metric report error:" + e.getMessage());
+ }
+ }
+ }
+
+ public Map<String, Metric> getMetrics() {
+ return metricRegistry.getMetrics();
+ }
+
+ public SortedMap<String, Gauge> getGuageMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getGauges(filter);
+ }
+
+ public SortedMap<String, Counter> getCounterMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getCounters(filter);
+ }
+
+ public SortedMap<String, Histogram> getHistogramMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getHistograms(filter);
+ }
+
+ public SortedMap<String, Meter> getMeterMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getMeters(filter);
+ }
+
+ public SortedMap<String, Timer> getTimerMetrics(MetricFilter filter) {
+ if(filter == null) {
+ filter = MetricFilter.ALL;
+ }
+ return metricRegistry.getTimers(filter);
+ }
+
+ public void register(String contextName, MetricSet metricSet) {
+ metricRegistry.register(MetricRegistry.name(metricsGroupName, contextName), metricSet);
+ }
+
+ public void register(String contextName, String itemName, Gauge gauge) {
+ metricRegistry.register(makeMetricsName(metricsGroupName, contextName, itemName), gauge);
+ }
+
+ public Counter counter(String contextName, String itemName) {
+ return metricRegistry.counter(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public Histogram histogram(String contextName, String itemName) {
+ return metricRegistry.histogram(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public Meter meter(String contextName, String itemName) {
+ return metricRegistry.meter(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public Timer timer(String contextName, String itemName) {
+ return metricRegistry.timer(makeMetricsName(metricsGroupName, contextName, itemName));
+ }
+
+ public static String makeMetricsName(String metricsGroupName, String contextName, String itemName) {
+ return MetricRegistry.name(metricsGroupName, contextName, itemName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
new file mode 100644
index 0000000..4192ca0
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/TajoSystemMetrics.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.metrics.reporter.TajoMetricsScheduledReporter;
+
+import java.util.*;
+
+public class TajoSystemMetrics extends TajoMetrics {
+ private static final Log LOG = LogFactory.getLog(TajoSystemMetrics.class);
+
+ private PropertiesConfiguration metricsProps;
+
+ private Thread propertyChangeChecker;
+
+ private String hostAndPort;
+
+ private List<TajoMetricsScheduledReporter> metricsReporters = new ArrayList<TajoMetricsScheduledReporter>();
+
+ private boolean inited = false;
+
+ private String metricsPropertyFileName;
+
+ public TajoSystemMetrics(TajoConf tajoConf, String metricsGroupName, String hostAndPort) {
+ super(metricsGroupName);
+
+ this.hostAndPort = hostAndPort;
+ try {
+ this.metricsPropertyFileName = tajoConf.getVar(TajoConf.ConfVars.METRICS_PROPERTY_FILENAME);
+ this.metricsProps = new PropertiesConfiguration(metricsPropertyFileName);
+ this.metricsProps.addConfigurationListener(new MetricsReloadListener());
+ FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
+ reloadingStrategy.setRefreshDelay(5 * 1000);
+ this.metricsProps.setReloadingStrategy(reloadingStrategy);
+ } catch (ConfigurationException e) {
+ LOG.warn(e.getMessage(), e);
+ }
+
+ //PropertiesConfiguration fire configurationChanged after getXXX()
+ //So neeaded calling getXXX periodically
+ propertyChangeChecker = new Thread() {
+ public void run() {
+ while(!stop.get()) {
+ String value = metricsProps.getString("reporter.file");
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+
+ propertyChangeChecker.start();
+ }
+
+ public Collection<TajoMetricsScheduledReporter> getMetricsReporters() {
+ synchronized (metricsReporters) {
+ return Collections.unmodifiableCollection(metricsReporters);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ if(propertyChangeChecker != null) {
+ propertyChangeChecker.interrupt();
+ }
+ stopAndClearReporter();
+ }
+
+ protected void stopAndClearReporter() {
+ synchronized(metricsReporters) {
+ for(TajoMetricsScheduledReporter eachReporter: metricsReporters) {
+ eachReporter.close();
+ }
+
+ metricsReporters.clear();
+ }
+ }
+
+ public void start() {
+ setMetricsReporter(metricsGroupName);
+
+ String jvmMetricsName = metricsGroupName + "-jvm";
+ setMetricsReporter(jvmMetricsName);
+
+ if(!inited) {
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Heap"), new MemoryUsageGaugeSet());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "File"), new FileDescriptorRatioGauge());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "GC"), new GarbageCollectorMetricSet());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Thread"), new ThreadStatesGaugeSet());
+ metricRegistry.register(MetricRegistry.name(jvmMetricsName, "Log"), new LogEventGaugeSet());
+ }
+ inited = true;
+ }
+
+ private void setMetricsReporter(String groupName) {
+ //reporter name -> class name
+ Map<String, String> reporters = new HashMap<String, String>();
+
+ List<String> reporterNames = metricsProps.getList(groupName + ".reporters");
+ if(reporterNames.isEmpty()) {
+ LOG.warn("No property " + groupName + ".reporters in " + metricsPropertyFileName);
+ return;
+ }
+
+ Map<String, String> allReporterProperties = new HashMap<String, String>();
+
+ Iterator<String> keys = metricsProps.getKeys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ String value = metricsProps.getString(key);
+ if(key.indexOf("reporter.") == 0) {
+ String[] tokens = key.split("\\.");
+ if(tokens.length == 2) {
+ reporters.put(tokens[1], value);
+ }
+ } else if(key.indexOf(groupName + ".") == 0) {
+ String[] tokens = key.split("\\.");
+ if(tokens.length > 2) {
+ allReporterProperties.put(key, value);
+ }
+ }
+ }
+
+ synchronized(metricsReporters) {
+ for(String eachReporterName: reporterNames) {
+ if("null".equals(eachReporterName)) {
+ continue;
+ }
+ String reporterClass = reporters.get(eachReporterName);
+ if(reporterClass == null) {
+ LOG.warn("No metrics reporter definition[" + eachReporterName + "] in " + metricsPropertyFileName);
+ continue;
+ }
+
+ Map<String, String> eachMetricsReporterProperties = findMetircsProperties(allReporterProperties,
+ groupName + "." + eachReporterName);
+
+ try {
+ Object reporterObject = Class.forName(reporterClass).newInstance();
+ if(!(reporterObject instanceof TajoMetricsScheduledReporter)) {
+ LOG.warn(reporterClass + " is not subclass of " + TajoMetricsScheduledReporter.class.getCanonicalName());
+ continue;
+ }
+ TajoMetricsScheduledReporter reporter = (TajoMetricsScheduledReporter)reporterObject;
+ reporter.init(metricRegistry, groupName, hostAndPort, eachMetricsReporterProperties);
+ reporter.start();
+
+ metricsReporters.add(reporter);
+ LOG.info("Started metrics reporter " + reporter.getClass().getCanonicalName() + " for " + groupName);
+ } catch (ClassNotFoundException e) {
+ LOG.warn("No metrics reporter class[" + eachReporterName + "], required class= " + reporterClass);
+ continue;
+ } catch (Exception e) {
+ LOG.warn("Can't initiate metrics reporter class[" + eachReporterName + "]" + e.getMessage() , e);
+ continue;
+ }
+ }
+ }
+ }
+
+ private Map<String, String> findMetircsProperties(Map<String, String> allReporterProperties, String findKey) {
+ Map<String, String> metricsProperties = new HashMap<String, String>();
+
+ for (Map.Entry<String, String> entry: allReporterProperties.entrySet()) {
+ String eachKey = entry.getKey();
+ if (eachKey.indexOf(findKey) == 0) {
+ metricsProperties.put(eachKey, entry.getValue());
+ }
+ }
+ return metricsProperties;
+ }
+
+ class MetricsReloadListener implements ConfigurationListener {
+ @Override
+ public synchronized void configurationChanged(ConfigurationEvent event) {
+ if (!event.isBeforeUpdate()) {
+ stopAndClearReporter();
+ start();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
new file mode 100644
index 0000000..b9acf0e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/GangliaReporter.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import info.ganglia.gmetric4j.gmetric.GMetricSlope;
+import info.ganglia.gmetric4j.gmetric.GMetricType;
+import info.ganglia.gmetric4j.gmetric.GangliaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class GangliaReporter extends TajoMetricsScheduledReporter {
+ private static final Logger LOG = LoggerFactory.getLogger(GangliaReporter.class);
+ public static final String REPORTER_NAME = "ganglia";
+
+ private GMetric ganglia;
+ private String prefix;
+ private int tMax = 60;
+ private int dMax = 0;
+
+ @Override
+ protected String getReporterName() {
+ return REPORTER_NAME;
+ }
+
+ @Override
+ protected void afterInit() {
+ String server = metricsProperties.get(metricsPropertyKey + "server");
+ String port = metricsProperties.get(metricsPropertyKey + "port");
+
+ if(server == null || server.isEmpty()) {
+ LOG.warn("No " + metricsPropertyKey + "server property in tajo-metrics.properties");
+ return;
+ }
+
+ if(port == null || port.isEmpty()) {
+ LOG.warn("No " + metricsPropertyKey + "port property in tajo-metrics.properties");
+ return;
+ }
+
+ try {
+ ganglia = new GMetric(server, Integer.parseInt(port), GMetric.UDPAddressingMode.MULTICAST, 1);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public void settMax(int tMax) {
+ this.tMax = tMax;
+ }
+
+ public void setdMax(int dMax) {
+ this.dMax = dMax;
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+ reportGauge(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+ reportCounter(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+ reportHistogram(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+ reportMeter(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+ reportTimer(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void reportTimer(String name, Timer timer) {
+ final String group = group(name);
+ try {
+ final Snapshot snapshot = timer.getSnapshot();
+
+ announce(prefix(name, "max"), group, convertDuration(snapshot.getMax()), getDurationUnit());
+ announce(prefix(name, "mean"), group, convertDuration(snapshot.getMean()), getDurationUnit());
+ announce(prefix(name, "min"), group, convertDuration(snapshot.getMin()), getDurationUnit());
+ announce(prefix(name, "stddev"), group, convertDuration(snapshot.getStdDev()), getDurationUnit());
+
+ announce(prefix(name, "p50"), group, convertDuration(snapshot.getMedian()), getDurationUnit());
+ announce(prefix(name, "p75"),
+ group,
+ convertDuration(snapshot.get75thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p95"),
+ group,
+ convertDuration(snapshot.get95thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p98"),
+ group,
+ convertDuration(snapshot.get98thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p99"),
+ group,
+ convertDuration(snapshot.get99thPercentile()),
+ getDurationUnit());
+ announce(prefix(name, "p999"),
+ group,
+ convertDuration(snapshot.get999thPercentile()),
+ getDurationUnit());
+
+ reportMetered(name, timer, group, "calls");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report timer {}", name, e);
+ }
+ }
+
+ private void reportMeter(String name, Meter meter) {
+ final String group = group(name);
+ try {
+ reportMetered(name, meter, group, "events");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report meter {}", name, e);
+ }
+ }
+
+ private void reportMetered(String name, Metered meter, String group, String eventName) throws GangliaException {
+ final String unit = eventName + '/' + getRateUnit();
+ announce(prefix(name, "count"), group, meter.getCount(), eventName);
+ announce(prefix(name, "m1_rate"), group, convertRate(meter.getOneMinuteRate()), unit);
+ announce(prefix(name, "m5_rate"), group, convertRate(meter.getFiveMinuteRate()), unit);
+ announce(prefix(name, "m15_rate"), group, convertRate(meter.getFifteenMinuteRate()), unit);
+ announce(prefix(name, "mean_rate"), group, convertRate(meter.getMeanRate()), unit);
+ }
+
+ private void reportHistogram(String name, Histogram histogram) {
+ final String group = group(name);
+ try {
+ final Snapshot snapshot = histogram.getSnapshot();
+
+ announce(prefix(name, "count"), group, histogram.getCount(), "");
+ announce(prefix(name, "max"), group, snapshot.getMax(), "");
+ announce(prefix(name, "mean"), group, snapshot.getMean(), "");
+ announce(prefix(name, "min"), group, snapshot.getMin(), "");
+ announce(prefix(name, "stddev"), group, snapshot.getStdDev(), "");
+ announce(prefix(name, "p50"), group, snapshot.getMedian(), "");
+ announce(prefix(name, "p75"), group, snapshot.get75thPercentile(), "");
+ announce(prefix(name, "p95"), group, snapshot.get95thPercentile(), "");
+ announce(prefix(name, "p98"), group, snapshot.get98thPercentile(), "");
+ announce(prefix(name, "p99"), group, snapshot.get99thPercentile(), "");
+ announce(prefix(name, "p999"), group, snapshot.get999thPercentile(), "");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report histogram {}", name, e);
+ }
+ }
+
+ private void reportCounter(String name, Counter counter) {
+ final String group = group(name);
+ try {
+ announce(prefix(name, "count"), group, counter.getCount(), "");
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report counter {}", name, e);
+ }
+ }
+
+ private void reportGauge(String name, Gauge gauge) {
+ final String group = group(name);
+ final Object obj = gauge.getValue();
+
+ try {
+ ganglia.announce(name(prefix, name), String.valueOf(obj), detectType(obj), "",
+ GMetricSlope.BOTH, tMax, dMax, group);
+ } catch (GangliaException e) {
+ LOG.warn("Unable to report gauge {}", name, e);
+ }
+ }
+
+ private void announce(String name, String group, double value, String units) throws GangliaException {
+ ganglia.announce(name,
+ Double.toString(value),
+ GMetricType.DOUBLE,
+ units,
+ GMetricSlope.BOTH,
+ tMax,
+ dMax,
+ group);
+ }
+
+ private void announce(String name, String group, long value, String units) throws GangliaException {
+ final String v = Long.toString(value);
+ ganglia.announce(name,
+ v,
+ GMetricType.DOUBLE,
+ units,
+ GMetricSlope.BOTH,
+ tMax,
+ dMax,
+ group);
+ }
+
+ private GMetricType detectType(Object o) {
+ if (o instanceof Float) {
+ return GMetricType.FLOAT;
+ } else if (o instanceof Double) {
+ return GMetricType.DOUBLE;
+ } else if (o instanceof Byte) {
+ return GMetricType.INT8;
+ } else if (o instanceof Short) {
+ return GMetricType.INT16;
+ } else if (o instanceof Integer) {
+ return GMetricType.INT32;
+ } else if (o instanceof Long) {
+ return GMetricType.DOUBLE;
+ }
+ return GMetricType.STRING;
+ }
+
+ private String group(String name) {
+ String[] tokens = name.split("\\.");
+ if(tokens.length < 3) {
+ return "";
+ }
+ return tokens[0] + "." + tokens[1];
+ }
+
+ private String prefix(String name, String n) {
+ return name(prefix, name, n);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
new file mode 100644
index 0000000..80b77f1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleReporter.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsConsoleReporter extends TajoMetricsReporter {
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ final String dateTime = dateFormat.format(new Date());
+ double rateFactor = TimeUnit.SECONDS.toSeconds(1);
+
+ if (!gauges.isEmpty()) {
+ Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+ for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+ System.out.println(gaugeGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!counters.isEmpty()) {
+ Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+ for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+ System.out.println(counterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!histograms.isEmpty()) {
+ Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+ for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+ System.out.println(histogramGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!meters.isEmpty()) {
+ Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+ for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+ System.out.println(meterGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+
+ if (!timers.isEmpty()) {
+ Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+ for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+ System.out.println(timerGroupToString(dateTime, null, rateFactor, eachGroup.getKey(), eachGroup.getValue()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
new file mode 100644
index 0000000..286ef8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsConsoleScheduledReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+public class MetricsConsoleScheduledReporter extends MetricsStreamScheduledReporter {
+ public static final String REPORTER_NAME = "console";
+ @Override
+ protected String getReporterName() {
+ return REPORTER_NAME;
+ }
+
+ @Override
+ protected void afterInit() {
+ setOutput(System.out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
new file mode 100644
index 0000000..35dd6f1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+
+public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter {
+ private static final Log LOG = LogFactory.getLog(MetricsFileScheduledReporter.class);
+ public static final String REPORTER_NAME = "file";
+
+ protected String getReporterName() {
+ return REPORTER_NAME;
+ }
+
+ @Override
+ protected void afterInit() {
+ String fileName = metricsProperties.get(metricsPropertyKey + "filename");
+ if(fileName == null) {
+ LOG.warn("No " + metricsPropertyKey + "filename property in tajo-metrics.properties");
+ return;
+ }
+ try {
+ File file = new File(fileName);
+ File parentFile = file.getParentFile();
+ if(parentFile != null && !parentFile.exists()) {
+ if(!parentFile.mkdirs()) {
+ LOG.warn("Can't create dir for tajo metrics:" + parentFile.getAbsolutePath());
+ }
+ }
+ this.setOutput(new FileOutputStream(fileName, true));
+ this.setDateFormat(null);
+ } catch (FileNotFoundException e) {
+ LOG.warn("Can't open metrics file:" + fileName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
new file mode 100644
index 0000000..4fbefd7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsStreamScheduledReporter.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public abstract class MetricsStreamScheduledReporter extends TajoMetricsScheduledReporter {
+ private static final Log LOG = LogFactory.getLog(MetricsStreamScheduledReporter.class);
+
+ protected OutputStream output;
+ protected Locale locale;
+ protected Clock clock;
+ protected TimeZone timeZone;
+ protected MetricFilter filter;
+ protected DateFormat dateFormat;
+
+ private final byte[] NEW_LINE = "\n".getBytes();
+
+ public MetricsStreamScheduledReporter() {
+ dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ clock = Clock.defaultClock();
+ }
+
+ public void setOutput(OutputStream output) {
+ this.output = output;
+ }
+
+ public void setLocale(Locale locale) {
+ this.locale = locale;
+ }
+
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ public void setTimeZone(TimeZone timeZone) {
+ this.dateFormat.setTimeZone(timeZone);
+ this.timeZone = timeZone;
+ }
+
+ public void setDateFormat(DateFormat dateFormat) {
+ this.dateFormat = dateFormat;
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ final String dateTime = dateFormat == null ? "" + clock.getTime() : dateFormat.format(new Date(clock.getTime()));
+
+ if (!gauges.isEmpty()) {
+ Map<String, Map<String, Gauge>> gaugeGroups = findMetricsItemGroup(gauges);
+
+ for(Map.Entry<String, Map<String, Gauge>> eachGroup: gaugeGroups.entrySet()) {
+ printGaugeGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!counters.isEmpty()) {
+ Map<String, Map<String, Counter>> counterGroups = findMetricsItemGroup(counters);
+
+ for(Map.Entry<String, Map<String, Counter>> eachGroup: counterGroups.entrySet()) {
+ printCounterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!histograms.isEmpty()) {
+ Map<String, Map<String, Histogram>> histogramGroups = findMetricsItemGroup(histograms);
+
+ for(Map.Entry<String, Map<String, Histogram>> eachGroup: histogramGroups.entrySet()) {
+ printHistogramGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!meters.isEmpty()) {
+ Map<String, Map<String, Meter>> meterGroups = findMetricsItemGroup(meters);
+
+ for(Map.Entry<String, Map<String, Meter>> eachGroup: meterGroups.entrySet()) {
+ printMeterGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+
+ if (!timers.isEmpty()) {
+ Map<String, Map<String, Timer>> timerGroups = findMetricsItemGroup(timers);
+
+ for(Map.Entry<String, Map<String, Timer>> eachGroup: timerGroups.entrySet()) {
+ printTimerGroup(dateTime, eachGroup.getKey(), eachGroup.getValue());
+ }
+ }
+ try {
+ output.flush();
+ } catch (IOException e) {
+ }
+ }
+
+ private void printMeterGroup(String dateTime, String groupName, Map<String, Meter> meters) {
+ try {
+ output.write(meterGroupToString(dateTime, hostAndPort, rateFactor, groupName, meters).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printCounterGroup(String dateTime, String groupName, Map<String, Counter> counters) {
+ try {
+ output.write(counterGroupToString(dateTime, hostAndPort, rateFactor, groupName, counters).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printGaugeGroup(String dateTime, String groupName, Map<String, Gauge> gauges) {
+ try {
+ output.write(gaugeGroupToString(dateTime, hostAndPort, rateFactor, groupName, gauges).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printHistogramGroup(String dateTime, String groupName, Map<String, Histogram> histograms) {
+ try {
+ output.write(histogramGroupToString(dateTime, hostAndPort, rateFactor, groupName, histograms).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ private void printTimerGroup(String dateTime, String groupName, Map<String, Timer> timers) {
+ try {
+ output.write(timerGroupToString(dateTime, hostAndPort, rateFactor, groupName, timers).getBytes());
+ output.write(NEW_LINE);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if(output != null) {
+ try {
+ output.close();
+ } catch (IOException e) {
+ }
+ }
+
+ super.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
new file mode 100644
index 0000000..9dc1755
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/NullReporter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.SortedMap;
+
+public class NullReporter extends TajoMetricsReporter {
+ @Override
+ public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/62c49c05/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
new file mode 100644
index 0000000..a32a913
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsReporter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.metrics.reporter;
+
+import com.codahale.metrics.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+
+public abstract class TajoMetricsReporter {
+ public abstract void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers);
+
+ public <T> Map<String, Map<String, T>> findMetricsItemGroup(SortedMap<String, T> metricsMap) {
+ Map<String, Map<String, T>> metricsGroup = new HashMap<String, Map<String, T>>();
+
+ String previousGroup = null;
+ Map<String, T> groupItems = new HashMap<String, T>();
+
+ for (Map.Entry<String, T> entry : metricsMap.entrySet()) {
+ String key = entry.getKey();
+ String[] keyTokens = key.split("\\.");
+
+ String groupName = null;
+ String itemName = null;
+
+ if (keyTokens.length > 2) {
+ groupName = keyTokens[0] + "." + keyTokens[1];
+ itemName = "";
+ String prefix = "";
+ for (int i = 2; i < keyTokens.length; i++) {
+ itemName += prefix + keyTokens[i];
+ prefix = ".";
+ }
+ } else {
+ groupName = "";
+ itemName = key;
+ if(!metricsGroup.containsKey(groupName)) {
+ metricsGroup.put(groupName, new HashMap<String, T>());
+ }
+ metricsGroup.get(groupName).put(itemName, entry.getValue());
+ continue;
+ }
+
+ if (previousGroup != null && !previousGroup.equals(groupName)) {
+ metricsGroup.put(previousGroup, groupItems);
+ groupItems = new HashMap<String, T>();
+ }
+ groupItems.put(itemName, entry.getValue());
+ previousGroup = groupName;
+ }
+
+ if(groupItems != null && !groupItems.isEmpty()) {
+ metricsGroup.put(previousGroup, groupItems);
+ }
+
+ return metricsGroup;
+ }
+
+ protected String meterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Meter> meters) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("meter").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Meter> eachMeter: meters.entrySet()) {
+ String key = eachMeter.getKey();
+ Meter meter = eachMeter.getValue();
+ sb.append(prefix);
+ sb.append(key).append(".count=").append(meter.getCount()).append("|");
+ sb.append(key).append(".mean=").append(String.format("%2.2f",
+ convertRate(meter.getMeanRate(), rateFactor))).append("|");
+ sb.append(key).append(".1minute=").append(String.format("%2.2f",
+ convertRate(meter.getOneMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".5minute=").append(String.format("%2.2f",
+ convertRate(meter.getFiveMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".15minute=").append(String.format("%2.2f",
+ convertRate(meter.getFifteenMinuteRate(), rateFactor)));
+ prefix = ",";
+ }
+
+ return sb.toString();
+ }
+
+ protected String counterGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Counter> counters) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("counter").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Counter> eachCounter: counters.entrySet()) {
+ sb.append(prefix);
+ sb.append(eachCounter.getKey()).append("=").append(eachCounter.getValue().getCount());
+ prefix = ",";
+
+ }
+ return sb.toString();
+ }
+
+ protected String gaugeGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Gauge> gauges) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("guage").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Gauge> eachGauge: gauges.entrySet()) {
+ sb.append(prefix).append(eachGauge.getKey()).append("=").append(eachGauge.getValue().getValue());
+ prefix = ",";
+ }
+ return sb.toString();
+ }
+
+ protected String histogramGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Histogram> histograms) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("histo").append(" ");
+
+ String prefix = "";
+ for(Map.Entry<String, Histogram> eachHistogram: histograms.entrySet()) {
+ String key = eachHistogram.getKey();
+ Histogram histogram = eachHistogram.getValue();
+ sb.append(prefix);
+ sb.append(key).append(".count=").append(histogram.getCount()).append("|");
+
+ Snapshot snapshot = histogram.getSnapshot();
+
+ sb.append(key).append(".min=").append(snapshot.getMin()).append("|");
+ sb.append(key).append(".max=").append(snapshot.getMax()).append("|");
+ sb.append(key).append(".mean=").append(String.format("%2.2f", snapshot.getMean())).append("|");
+ sb.append(key).append(".stddev=").append(String.format("%2.2f", snapshot.getStdDev())).append("|");
+ sb.append(key).append(".median=").append(String.format("%2.2f", snapshot.getMedian())).append("|");
+ sb.append(key).append(".75%=").append(String.format("%2.2f", snapshot.get75thPercentile())).append("|");
+ sb.append(key).append(".95%=").append(String.format("%2.2f", snapshot.get95thPercentile())).append("|");
+ sb.append(key).append(".98%=").append(String.format("%2.2f", snapshot.get98thPercentile())).append("|");
+ sb.append(key).append(".99%=").append(String.format("%2.2f", snapshot.get99thPercentile())).append("|");
+ sb.append(key).append(".999%=").append(String.format("%2.2f", snapshot.get999thPercentile()));
+ prefix = ",";
+ }
+ return sb.toString();
+ }
+
+ protected String timerGroupToString(String dateTime, String hostAndPort, double rateFactor,
+ String groupName, Map<String, Timer> timers) {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(dateTime).append(" ");
+ if(hostAndPort != null && !hostAndPort.isEmpty()) {
+ sb.append(hostAndPort).append(" ");
+ }
+ sb.append("timer").append(" ");
+
+ if(!groupName.isEmpty()) {
+ sb.append(groupName).append(" ");
+ }
+ String prefix = "";
+ for(Map.Entry<String, Timer> eachTimer: timers.entrySet()) {
+ String key = eachTimer.getKey();
+ Timer timer = eachTimer.getValue();
+ Snapshot snapshot = timer.getSnapshot();
+
+ sb.append(prefix);
+ sb.append(key).append(".count=").append(timer.getCount()).append("|");
+ sb.append(key).append(".meanrate=").append(String.format("%2.2f", convertRate(timer.getMeanRate(), rateFactor))).append("|");
+ sb.append(key).append(".1minuterate=").append(String.format("%2.2f", convertRate(timer.getOneMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".5minuterate=").append(String.format("%2.2f", convertRate(timer.getFiveMinuteRate(), rateFactor))).append("|");
+ sb.append(key).append(".15minuterate=").append(String.format("%2.2f", convertRate(timer.getFifteenMinuteRate(),rateFactor))).append("|");
+ sb.append(key).append(".min=").append(String.format("%2.2f", convertRate(snapshot.getMin(), rateFactor))).append("|");
+ sb.append(key).append(".max=").append(String.format("%2.2f", convertRate(snapshot.getMax(),rateFactor))).append("|");
+ sb.append(key).append(".mean=").append(String.format("%2.2f", convertRate(snapshot.getMean(), rateFactor))).append("|");
+ sb.append(key).append(".stddev=").append(String.format("%2.2f", convertRate(snapshot.getStdDev(),rateFactor))).append("|");
+ sb.append(key).append(".median=").append(String.format("%2.2f", convertRate(snapshot.getMedian(), rateFactor))).append("|");
+ sb.append(key).append(".75%=").append(String.format("%2.2f", convertRate(snapshot.get75thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".95%=").append(String.format("%2.2f", convertRate(snapshot.get95thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".98%=").append(String.format("%2.2f", convertRate(snapshot.get98thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".99%=").append(String.format("%2.2f", convertRate(snapshot.get99thPercentile(), rateFactor))).append("|");
+ sb.append(key).append(".999%=").append(String.format("%2.2f", convertRate(snapshot.get999thPercentile(),rateFactor)));
+ prefix = ",";
+ }
+
+ return sb.toString();
+ }
+
+ protected double convertRate(double rate, double rateFactor) {
+ return rate * rateFactor;
+ }
+
+}