You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/15 09:02:10 UTC
[2/2] kylin git commit: KYLIN-2722 Introduce a new measure for
dropwizard metrics framework, called active reservoir,
for actively pushing metrics to reporters
KYLIN-2722 Introduce a new measure for dropwizard metrics framework, called active reservoir, for actively pushing metrics to reporters
Signed-off-by: Li Yang <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/75bd5b61
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/75bd5b61
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/75bd5b61
Branch: refs/heads/pr77
Commit: 75bd5b61641547a58cb40fb4b498e9b110b120c2
Parents: 8891b1c
Author: Zhong <nj...@apache.org>
Authored: Tue Aug 8 22:50:54 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Oct 15 15:49:23 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 10 +-
core-metrics/pom.xml | 51 ++++
.../kylin/metrics/lib/ActiveReservoir.java | 40 +++
.../metrics/lib/ActiveReservoirFilter.java | 44 +++
.../metrics/lib/ActiveReservoirListener.java | 30 ++
.../metrics/lib/ActiveReservoirReporter.java | 51 ++++
.../org/apache/kylin/metrics/lib/Record.java | 51 ++++
.../java/org/apache/kylin/metrics/lib/Sink.java | 23 ++
.../lib/impl/AbstractActiveReservoir.java | 68 +++++
.../metrics/lib/impl/BaseScheduledReporter.java | 103 +++++++
.../metrics/lib/impl/BlockingReservoir.java | 167 ++++++++++++
.../metrics/lib/impl/InstantReservoir.java | 76 ++++++
.../kylin/metrics/lib/impl/MetricsSystem.java | 164 +++++++++++
.../kylin/metrics/lib/impl/RecordEvent.java | 272 +++++++++++++++++++
.../metrics/lib/impl/RecordEventTimeDetail.java | 77 ++++++
.../metrics/lib/impl/RecordEventWrapper.java | 61 +++++
.../kylin/metrics/lib/impl/ReporterBuilder.java | 48 ++++
.../kylin/metrics/lib/impl/StubReservoir.java | 54 ++++
.../metrics/lib/impl/StubReservoirReporter.java | 51 ++++
.../apache/kylin/metrics/lib/impl/StubSink.java | 30 ++
.../metrics/lib/impl/TimePropertyEnum.java | 49 ++++
metrics-reporter-hive/pom.xml | 53 ++++
.../metrics/lib/impl/hive/HiveProducer.java | 200 ++++++++++++++
.../lib/impl/hive/HiveProducerRecord.java | 196 +++++++++++++
.../lib/impl/hive/HiveReservoirReporter.java | 139 ++++++++++
.../kylin/metrics/lib/impl/hive/HiveSink.java | 30 ++
metrics-reporter-kafka/pom.xml | 46 ++++
.../kafka/KafkaActiveReserviorListener.java | 115 ++++++++
.../lib/impl/kafka/KafkaReservoirReporter.java | 139 ++++++++++
.../kylin/metrics/lib/impl/kafka/KafkaSink.java | 29 ++
pom.xml | 15 +
31 files changed, 2479 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 34d8b7c..f3cf6c0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1295,9 +1295,9 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.valueOf(this.getOptional("kylin.restclient.connection.max-total", "200"));
}
- /**
- * metric
- */
+ // ============================================================================
+ // Metrics
+ // ============================================================================
public String getCoadhaleMetricsReportClassesNames() {
return getOptional("kylin.metrics.reporter-classes",
"org.apache.kylin.common.metrics.metrics2.JsonFileMetricsReporter,org.apache.kylin.common.metrics.metrics2.JmxMetricsReporter");
@@ -1315,4 +1315,8 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.metrics.perflogger-class", "org.apache.kylin.common.metrics.perflog.PerfLogger");
}
+ public String getMetricsActiveReservoirDefaultClass() {
+ return getOptional("kylin.metrics.active-reservoir-default-class",
+ "org.apache.kylin.metrics.lib.impl.StubReservoir");
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml
new file mode 100644
index 0000000..454a1ff
--- /dev/null
+++ b/core-metrics/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-core-metrics</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Kylin - Core Metrics</name>
+ <description>Apache Kylin - Core Metrics</description>
+
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
new file mode 100644
index 0000000..36ab759
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+
+public interface ActiveReservoir extends Closeable {
+
+ int size();
+
+ void update(Record record);
+
+ void addListener(ActiveReservoirListener listener);
+
+ void removeListener(ActiveReservoirListener listener);
+
+ void removeAllListener();
+
+ void setHAListener(ActiveReservoirListener listener);
+
+ void start();
+
+ void stop();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
new file mode 100644
index 0000000..5cffcfc
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+/**
+ * A filter used to determine whether or not an active reservoir should be reported, among other things.
+ */
+public interface ActiveReservoirFilter {
+
+ /**
+ * Matches all active reservoirs, regardless of type or name.
+ */
+ ActiveReservoirFilter ALL = new ActiveReservoirFilter() {
+ @Override
+ public boolean matches(String name, ActiveReservoir activeReservoir) {
+ return true;
+ }
+ };
+
+ /**
+ * Returns {@code true} if the active reservoir matches the filter; {@code false} otherwise.
+ *
+ * @param name the active reservoir's name
+ * @param activeReservoir the active reservoir
+ * @return {@code true} if the active reservoir matches the filter
+ */
+ boolean matches(String name, ActiveReservoir activeReservoir);
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
new file mode 100644
index 0000000..f64caba
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+import java.util.EventListener;
+import java.util.List;
+
+public interface ActiveReservoirListener extends EventListener, Closeable {
+
+ boolean onRecordUpdate(final List<Record> records);
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
new file mode 100644
index 0000000..6020865
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.io.Closeable;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.Pair;
+
+import com.google.common.base.Strings;
+
+public abstract class ActiveReservoirReporter implements Closeable {
+
+ public static final String KYLIN_PREFIX = "KYLIN";
+
+ public static Pair<String, String> getTableNameSplits(String tableName) {
+ if (Strings.isNullOrEmpty(tableName)) {
+ return null;
+ }
+
+ String[] splits = tableName.split(Pattern.quote("."));
+ int i = 0;
+ String database = splits.length == 1 ? KYLIN_PREFIX : splits[i++];
+ String tableNameOnly = splits[i];
+ return new Pair(database, tableNameOnly);
+ }
+
+ public static String getTableName(Pair<String, String> tableNameSplits) {
+ return tableNameSplits.getFirst() + "." + tableNameSplits.getSecond();
+ }
+
+ public abstract void start();
+
+ public abstract void stop();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
new file mode 100644
index 0000000..a1bce1f
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+import java.util.Map;
+
+public interface Record {
+
+ /**
+ * For classification
+ */
+ String getType();
+
+ /**
+ * For keep ordering in the same category
+ */
+ byte[] getKey();
+
+ /**
+ * For the contents will be used
+ */
+ byte[] getValue();
+
+ /**
+ * For the raw contents will be used
+ */
+ Map<String, Object> getValueRaw();
+
+ /**
+ * For the timestamp the record created
+ */
+ Long getTime();
+
+ Record clone();
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
new file mode 100644
index 0000000..dff71bd
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib;
+
+public interface Sink {
+ String getTableFromSubject(String subject);
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
new file mode 100644
index 0000000..cc72710
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+
+import com.google.common.collect.Lists;
+
+public abstract class AbstractActiveReservoir implements ActiveReservoir {
+
+ protected List<ActiveReservoirListener> listeners = Lists.newArrayList();
+
+ protected ActiveReservoirListener listenerHA = new StubReservoirReporter().listener;
+
+ protected boolean isReady = false;
+
+ public void addListener(ActiveReservoirListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(ActiveReservoirListener listener) {
+ listener.close();
+ listeners.remove(listener);
+ }
+
+ public void removeAllListener() {
+ for (ActiveReservoirListener listener : listeners) {
+ listener.close();
+ }
+ listeners.clear();
+ }
+
+ public void setHAListener(ActiveReservoirListener listener) {
+ this.listenerHA = listener;
+ }
+
+ public void start() {
+ isReady = true;
+ }
+
+ public void stop() {
+ isReady = false;
+ }
+
+ public void close() {
+ stop();
+ removeAllListener();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
new file mode 100644
index 0000000..531376a
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public abstract class BaseScheduledReporter implements Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(BaseScheduledReporter.class);
+
+ private final ScheduledExecutorService executor;
+
+ BaseScheduledReporter() {
+ this("default");
+ }
+
+ BaseScheduledReporter(String name) {
+ this(Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("metrics-scheduler-" + name + "-%d").build()));
+ }
+
+ BaseScheduledReporter(ScheduledExecutorService executor) {
+ this.executor = executor;
+ }
+
+ public abstract void report();
+
+ /**
+ * Starts the reporter polling at the given period.
+ *
+ * @param period the amount of time between polls
+ * @param unit the unit for {@code period}
+ */
+ public void start(long period, TimeUnit unit) {
+ executor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ report();
+ } catch (RuntimeException ex) {
+ logger.error("RuntimeException thrown from {}#report. Exception was suppressed.",
+ BaseScheduledReporter.this.getClass().getSimpleName(), ex);
+ }
+ }
+ }, period, period, unit);
+ }
+
+ /**
+ * Stops the reporter and shuts down its thread of execution.
+ *
+ * Uses the shutdown pattern from http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
+ */
+ public void stop() {
+ executor.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ executor.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ executor.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Stops the reporter and shuts down its thread of execution.
+ */
+ @Override
+ public void close() {
+ stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
new file mode 100644
index 0000000..3301867
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BlockingReservoir extends AbstractActiveReservoir {
+
+ private static final Logger logger = LoggerFactory.getLogger(BlockingReservoir.class);
+
+ private final BlockingQueue<Record> recordsQueue;
+ private final Thread scheduledReporter;
+ private final int MIN_REPORT_SIZE;
+ private final int MAX_REPORT_SIZE;
+ private final long MAX_REPORT_TIME;
+ private List<Record> records;
+
+ public BlockingReservoir() {
+ this(1, 100);
+ }
+
+ public BlockingReservoir(int minReportSize, int maxReportSize) {
+ this(minReportSize, maxReportSize, 10);
+ }
+
+ public BlockingReservoir(int minReportSize, int maxReportSize, int MAX_REPORT_TIME) {
+ this.MAX_REPORT_SIZE = maxReportSize;
+ this.MIN_REPORT_SIZE = minReportSize;
+ this.MAX_REPORT_TIME = MAX_REPORT_TIME * 60 * 1000L;
+
+ this.recordsQueue = new LinkedBlockingQueue<>();
+ this.listeners = Lists.newArrayList();
+
+ this.records = Lists.newArrayListWithExpectedSize(MAX_REPORT_SIZE);
+
+ scheduledReporter = new ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build()
+ .newThread(new ReporterRunnable());
+ }
+
+ public void update(Record record) {
+ if (!isReady) {
+ logger.info("Current reservoir is not ready for update record");
+ return;
+ }
+ try {
+ recordsQueue.put(record);
+ } catch (InterruptedException e) {
+ logger.warn("Thread is interrupted during putting value to blocking queue. \n" + e.toString());
+ }
+ }
+
+ public int size() {
+ return recordsQueue.size();
+ }
+
+ private void onRecordUpdate(boolean ifAll) {
+ if (ifAll) {
+ records = Lists.newArrayList();
+ recordsQueue.drainTo(records);
+ } else {
+ records.clear();
+ recordsQueue.drainTo(records, MAX_REPORT_SIZE);
+ }
+
+ boolean ifSucceed = true;
+ for (ActiveReservoirListener listener : listeners) {
+ if (!notifyListenerOfUpdatedRecord(listener, records)) {
+ ifSucceed = false;
+ logger.warn("It fails to notify listener " + listener.toString() + " of updated records "
+ + records.toString());
+ }
+ }
+ if (!ifSucceed) {
+ notifyListenerHAOfUpdatedRecord(records);
+ }
+ }
+
+ private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, List<Record> records) {
+ return listener.onRecordUpdate(records);
+ }
+
+ private boolean notifyListenerHAOfUpdatedRecord(List<Record> records) {
+ logger.info("The HA listener " + listenerHA.toString() + " for updated records " + records.toString()
+ + " will be started");
+ if (!notifyListenerOfUpdatedRecord(listenerHA, records)) {
+ logger.error("The HA listener also fails!!!");
+ return false;
+ }
+ return true;
+ }
+
+ public void start() {
+ super.start();
+ scheduledReporter.start();
+ }
+
+ public void stop() {
+ super.stop();
+ scheduledReporter.interrupt();
+ try {
+ scheduledReporter.join();
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted during join");
+ throw new RuntimeException(e);
+ }
+ }
+
+ class ReporterRunnable implements Runnable {
+
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ while (isReady) {
+ if (size() <= 0) {
+ logger.info("There's no record in the blocking queue.");
+ sleep();
+ startTime = System.currentTimeMillis();
+ continue;
+ } else if (size() < MIN_REPORT_SIZE && (System.currentTimeMillis() - startTime < MAX_REPORT_TIME)) {
+ logger.info("The number of records in the blocking queue is less than " + MIN_REPORT_SIZE + //
+ " and the duration from last reporting is less than " + MAX_REPORT_TIME
+ + "ms. Will delay to report!");
+ sleep();
+ continue;
+ }
+
+ onRecordUpdate(false);
+ startTime = System.currentTimeMillis();
+ }
+ onRecordUpdate(true);
+ logger.info("Reporter finishes reporting metrics.");
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(60 * 1000);
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted during running");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
new file mode 100644
index 0000000..41b53cf
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class InstantReservoir extends AbstractActiveReservoir {
+
+ private static final Logger logger = LoggerFactory.getLogger(InstantReservoir.class);
+
+ public void update(Record record) {
+ if (!isReady) {
+ logger.info("Current reservoir is not ready for update record");
+ return;
+ }
+ onRecordUpdate(record);
+ }
+
+ public int size() {
+ return 0;
+ }
+
+ private void onRecordUpdate(Record record) {
+ boolean ifSucceed = true;
+ for (ActiveReservoirListener listener : listeners) {
+ if (!notifyListenerOfUpdatedRecord(listener, record)) {
+ ifSucceed = false;
+ logger.warn(
+ "It fails to notify listener " + listener.toString() + " of updated record " + record.getKey());
+ }
+ }
+ if (!ifSucceed) {
+ notifyListenerHAOfUpdatedRecord(record);
+ }
+ }
+
+ private boolean notifyListenerOfUpdatedRecord(ActiveReservoirListener listener, Record record) {
+ List<Record> recordsList = Lists.newArrayList();
+ recordsList.add(record);
+ return listener.onRecordUpdate(recordsList);
+ }
+
+ private boolean notifyListenerHAOfUpdatedRecord(Record record) {
+ logger.info("The HA listener " + listenerHA.toString() + " for updated record " + record.getKey()
+ + " will be started");
+ if (!notifyListenerOfUpdatedRecord(listenerHA, record)) {
+ logger.error("The HA listener also fails!!!");
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
new file mode 100644
index 0000000..dc0ab66
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class MetricsSystem extends MetricRegistry {
+ public static final MetricsSystem Metrics = new MetricsSystem();
+ private static final Logger logger = LoggerFactory.getLogger(MetricsSystem.class);
+ private final ConcurrentHashMap<String, ActiveReservoir> activeReservoirs;
+
+ private MetricsSystem() {
+ activeReservoirs = new ConcurrentHashMap<>();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ logger.info("Closing Metrics System");
+ try {
+ shutdown();
+ } catch (IOException e) {
+ logger.error("error during shutdown activeReservoirs and listeners", e);
+ }
+ logger.info("Closed Metrics System");
+ }
+ });
+ }
+
+ public void shutdown() throws IOException {
+ for (ActiveReservoir entry : activeReservoirs.values()) {
+ entry.close();
+ }
+ }
+
+ public ActiveReservoir activeReservoir(String name) {
+ return getOrAddActiveReservoir(name);
+ }
+
+ public ActiveReservoir register(String name, ActiveReservoir activeReservoir) {
+ if (name == null || activeReservoir == null) {
+ throw new IllegalArgumentException("neither of name or ActiveReservoir can be null");
+ }
+ final ActiveReservoir existingReservoir = activeReservoirs.putIfAbsent(name, activeReservoir);
+ if (existingReservoir == null) {
+ onActiveReservoirAdded(activeReservoir);
+ } else {
+ throw new IllegalArgumentException("An active reservoir named " + name + " already exists");
+ }
+
+ return activeReservoir;
+ }
+
+ /**
+ * Removes the active reservoir with the given name.
+ *
+ * @param name the name of the active reservoir
+ * @return whether or not the active reservoir was removed
+ */
+ public boolean removeActiveReservoir(String name) {
+ final ActiveReservoir recordReservoir = activeReservoirs.remove(name);
+ if (recordReservoir != null) {
+ onActiveReservoirRemoved(recordReservoir);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Removes all active reservoirs which match the given filter.
+ *
+ * @param filter a filter
+ */
+ public void removeActiveReservoirMatching(ActiveReservoirFilter filter) {
+ for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ removeActiveReservoir(entry.getKey());
+ }
+ }
+ }
+
+ private void onActiveReservoirAdded(ActiveReservoir activeReservoir) {
+ activeReservoir.start();
+ }
+
+ private void onActiveReservoirRemoved(ActiveReservoir activeReservoir) {
+ try {
+ activeReservoir.close();
+ } catch (IOException e) {
+ }
+ }
+
+ /**
+ * Returns a map of all the active reservoirs in the metrics system and their names.
+ *
+ * @return all the active reservoirs in the metrics system
+ */
+ public SortedMap<String, ActiveReservoir> getActiveReservoirs() {
+ return getActiveReservoirs(ActiveReservoirFilter.ALL);
+ }
+
+ /**
+ * Returns a map of all the active reservoirs in the metrics system and their names which match the given filter.
+ *
+ * @param filter the active reservoir filter to match
+ * @return all the active reservoirs in the metrics system
+ */
+ public SortedMap<String, ActiveReservoir> getActiveReservoirs(ActiveReservoirFilter filter) {
+ final TreeMap<String, ActiveReservoir> reservoirs = new TreeMap<>();
+ for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) {
+ if (filter.matches(entry.getKey(), entry.getValue())) {
+ reservoirs.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return Collections.unmodifiableSortedMap(reservoirs);
+ }
+
+ private ActiveReservoir getOrAddActiveReservoir(String name) {
+ ActiveReservoir activeReservoir = activeReservoirs.get(name);
+ if (activeReservoir != null) {
+ return activeReservoir;
+ } else {
+ String defaultActiveReservoirClass = KylinConfig.getInstanceFromEnv()
+ .getMetricsActiveReservoirDefaultClass();
+ try {
+ activeReservoir = (ActiveReservoir) Class.forName(defaultActiveReservoirClass).getConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ logger.warn(
+ "Failed to initialize the " + defaultActiveReservoirClass + ". The StubReservoir will be used");
+ activeReservoir = new StubReservoir();
+ }
+ return register(name, activeReservoir);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
new file mode 100644
index 0000000..f5bc797
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metrics.lib.Record;
+
+import com.google.common.collect.Maps;
+
+public class RecordEvent implements Record, Map<String, Object>, Serializable {
+
+ private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new ThreadLocal<ByteArrayOutputStream>();
+
+ static String localHostname;
+ static {
+ try {
+ InetAddress addr = InetAddress.getLocalHost();
+ localHostname = addr.getHostName() + ":" + addr.getHostAddress();
+ } catch (UnknownHostException e) {
+ localHostname = "Unknown";
+ }
+ }
+
+ private final Map<String, Object> backingMap;
+
+ private RecordEvent(Map<String, Object> map) {
+ this.backingMap = map;
+ }
+
+ public RecordEvent(String eventType) {
+ this(eventType, localHostname);
+ }
+
+ public RecordEvent(String eventType, long time) {
+ this(eventType, localHostname, time);
+ }
+
+ public RecordEvent(String eventType, String host) {
+ this(eventType, host, System.currentTimeMillis());
+ }
+
+ public RecordEvent(String eventType, String host, long time) {
+ this(null, eventType, host, time);
+ }
+
+ /**
+ *
+ * @param map
+ * @param eventType mandatory with null check
+ * @param host mandatory without null check
+ * @param time mandatory with null check
+ */
+ public RecordEvent(Map<String, Object> map, String eventType, String host, long time) {
+ backingMap = map != null ? map : Maps.<String, Object> newHashMap();
+ setEventType(eventType);
+ setHost(host);
+ setTime(time);
+ }
+
+ public String getEventType() {
+ return (String) get(RecordReserveKeyEnum.TYPE.toString());
+ }
+
+ private void setEventType(String eventType) {
+ if (eventType == null) {
+ throw new IllegalArgumentException("EventType cannot be null.");
+ }
+ put(RecordReserveKeyEnum.TYPE.toString(), eventType);
+ }
+
+ public String getHost() {
+ return (String) get(RecordReserveKeyEnum.HOST.toString());
+ }
+
+ private void setHost(String host) {
+ put(RecordReserveKeyEnum.HOST.toString(), host);
+ }
+
+ public Long getTime() {
+ return (Long) get(RecordReserveKeyEnum.TIME.toString());
+ }
+
+ private void setTime(Long time) {
+ if (time == null) {
+ throw new IllegalArgumentException("Time cannot be null.");
+ }
+ put(RecordReserveKeyEnum.TIME.toString(), time);
+ }
+
+ public void resetTime() {
+ setTime(System.currentTimeMillis());
+ }
+
+ public String getID() {
+ return (String) get(RecordReserveKeyEnum.ID.toString());
+ }
+
+ public void setID(String id) {
+ put(RecordReserveKeyEnum.ID.toString(), id);
+ }
+
+ @Override
+ public void clear() {
+ backingMap.clear();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return backingMap.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return backingMap.containsValue(value);
+ }
+
+ @Override
+ public Set<Entry<String, Object>> entrySet() {
+ return backingMap.entrySet();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o) || backingMap.equals(o);
+ }
+
+ @Override
+ public Object get(Object key) {
+ return backingMap.get(key);
+ }
+
+ @Override
+ public int hashCode() {
+ return backingMap.hashCode();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return backingMap.isEmpty();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return backingMap.keySet();
+ }
+
+ @Override
+ public Object put(String key, Object value) {
+ return backingMap.put(key, value);
+ }
+
+ @Override
+ public void putAll(Map<? extends String, ? extends Object> t) {
+ backingMap.putAll(t);
+ }
+
+ @Override
+ public Object remove(Object key) {
+ return backingMap.remove(key);
+ }
+
+ @Override
+ public int size() {
+ return backingMap.size();
+ }
+
+ @Override
+ public String toString() {
+ return backingMap.toString();
+ }
+
+ @Override
+ public Collection<Object> values() {
+ return backingMap.values();
+ }
+
+ @Override
+ public String getType() {
+ return getEventType();
+ }
+
+ @Override
+ public byte[] getKey() {
+ return (getHost() + "-" + getTime() + "-" + getID()).getBytes();
+ }
+
+ @Override
+ /**
+ * Event type and time does not belong to value part
+ */
+ public Map<String, Object> getValueRaw() {
+ Map<String, Object> cloneMap = Maps.newHashMap(backingMap);
+ cloneMap.remove(RecordReserveKeyEnum.TYPE.toString());
+ return cloneMap;
+ }
+
+ @Override
+ /**
+ * Event type does not belong to value part, it's for classification
+ */
+ public byte[] getValue() {
+ try {
+ ByteArrayOutputStream baos = _localBaos.get();
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ _localBaos.set(baos);
+ }
+ baos.reset();
+ JsonUtil.writeValue(baos, getValueRaw());
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);//in mem, should not happen
+ }
+ }
+
+ @Override
+ public RecordEvent clone() {
+ Map<String, Object> cloneMap = Maps.newHashMap();
+ cloneMap.putAll(backingMap);
+ return new RecordEvent(cloneMap);
+ }
+
+ public enum RecordReserveKeyEnum {
+ TYPE("EVENT_TYPE"), ID("EVENT_ID"), HOST("HOST"), TIME("KTIMESTAMP");
+
+ private final String reserveKey;
+
+ private RecordReserveKeyEnum(String key) {
+ this.reserveKey = key;
+ }
+
+ @Override
+ public String toString() {
+ return reserveKey;
+ }
+
+ public RecordReserveKeyEnum getByKey(String key) {
+ for (RecordReserveKeyEnum reserveKey : RecordReserveKeyEnum.values()) {
+ if (reserveKey.reserveKey == key) {
+ return reserveKey;
+ }
+ }
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
new file mode 100644
index 0000000..ff97b9b
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
+
+public class RecordEventTimeDetail {
+ private static final TimeZone timeZone;
+ private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>();
+ private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new ThreadLocal<SimpleDateFormat>();
+
+ static {
+ timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone());
+ }
+
+ public final String year_begin_date;
+ public final String month_begin_date;
+ public final String date;
+ public final String time;
+ public final int hour;
+ public final int minute;
+ public final int second;
+ public final String week_begin_date;
+
+ public RecordEventTimeDetail(long timeStamp) {
+ Calendar calendar = Calendar.getInstance(timeZone);
+ calendar.setTimeInMillis(timeStamp);
+
+ SimpleDateFormat dateFormat = dateFormatThreadLocal.get();
+ if (dateFormat == null) {
+ dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ dateFormat.setTimeZone(timeZone);
+ dateFormatThreadLocal.set(dateFormat);
+ }
+ SimpleDateFormat timeFormat = timeFormatThreadLocal.get();
+ if (timeFormat == null) {
+ timeFormat = new SimpleDateFormat("HH:mm:ss");
+ timeFormat.setTimeZone(timeZone);
+ timeFormatThreadLocal.set(timeFormat);
+ }
+
+ String yearStr = String.format("%04d", calendar.get(Calendar.YEAR));
+ String monthStr = String.format("%02d", calendar.get(Calendar.MONTH) + 1);
+ this.year_begin_date = yearStr + "-01-01";
+ this.month_begin_date = yearStr + "-" + monthStr + "-01";
+ this.date = dateFormat.format(calendar.getTime());
+ this.time = timeFormat.format(calendar.getTime());
+ this.hour = calendar.get(Calendar.HOUR_OF_DAY);
+ this.minute = calendar.get(Calendar.MINUTE);
+ this.second = calendar.get(Calendar.SECOND);
+
+ long timeStampForWeekBegin = timeStamp;
+ timeStampForWeekBegin -= 3600000 * 24 * (calendar.get(Calendar.DAY_OF_WEEK) - 1);
+ calendar.setTimeInMillis(timeStampForWeekBegin);
+ this.week_begin_date = dateFormat.format(calendar.getTime());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
new file mode 100644
index 0000000..7031129
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventWrapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.io.Serializable;
+
+import org.apache.kylin.metrics.lib.Record;
+
+public class RecordEventWrapper implements Serializable {
+
+ protected final RecordEvent metricsEvent;
+
+ public RecordEventWrapper(RecordEvent metricsEvent) {
+ this.metricsEvent = metricsEvent;
+
+ //Add time details
+ addTimeDetails();
+ }
+
+ private void addTimeDetails() {
+ RecordEventTimeDetail dateDetail = new RecordEventTimeDetail(metricsEvent.getTime());
+ metricsEvent.put(TimePropertyEnum.YEAR.toString(), dateDetail.year_begin_date);
+ metricsEvent.put(TimePropertyEnum.MONTH.toString(), dateDetail.month_begin_date);
+ metricsEvent.put(TimePropertyEnum.WEEK_BEGIN_DATE.toString(), dateDetail.week_begin_date);
+ metricsEvent.put(TimePropertyEnum.DAY_DATE.toString(), dateDetail.date);
+ metricsEvent.put(TimePropertyEnum.DAY_TIME.toString(), dateDetail.time);
+ metricsEvent.put(TimePropertyEnum.TIME_HOUR.toString(), dateDetail.hour);
+ metricsEvent.put(TimePropertyEnum.TIME_MINUTE.toString(), dateDetail.minute);
+ metricsEvent.put(TimePropertyEnum.TIME_SECOND.toString(), dateDetail.second);
+ }
+
+ public void resetTime() {
+ metricsEvent.resetTime();
+ addTimeDetails();
+ }
+
+ public Record getMetricsRecord() {
+ return metricsEvent;
+ }
+
+ @Override
+ public String toString() {
+ return metricsEvent.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
new file mode 100644
index 0000000..22fadd3
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+
+public abstract class ReporterBuilder {
+ protected final ActiveReservoir registry;
+ protected final Properties props;
+
+ protected ReporterBuilder(ActiveReservoir activeReservoir) {
+ this.registry = activeReservoir;
+ this.props = new Properties();
+ }
+
+ public ReporterBuilder setConfig(Properties props) {
+ if (props != null) {
+ this.props.putAll(props);
+ }
+ return this;
+ }
+
+ /**
+ * Builds a {@link ActiveReservoirReporter} with the given properties.
+ *
+ * @return a {@link ActiveReservoirReporter}
+ */
+ public abstract ActiveReservoirReporter build() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
new file mode 100644
index 0000000..fe69dec
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+
+public class StubReservoir implements ActiveReservoir {
+
+ public void addListener(ActiveReservoirListener listener) {
+ }
+
+ public void removeListener(ActiveReservoirListener listener) {
+ }
+
+ public void removeAllListener() {
+ }
+
+ public void setHAListener(ActiveReservoirListener listener) {
+ }
+
+ public void update(Record record) {
+ }
+
+ public int size() {
+ return 0;
+ }
+
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
new file mode 100644
index 0000000..5e0e637
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoirReporter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import java.util.List;
+
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+
+public class StubReservoirReporter extends ActiveReservoirReporter {
+
+ public static final String STUB_REPORTER_SUFFIX = "STUB";
+
+ public final StubReservoirListener listener = new StubReservoirListener();
+
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void close() {
+ }
+
+ private class StubReservoirListener implements ActiveReservoirListener {
+
+ public boolean onRecordUpdate(final List<Record> records) {
+ return true;
+ }
+
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
new file mode 100644
index 0000000..676c59c
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubSink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+import static org.apache.kylin.metrics.lib.impl.StubReservoirReporter.STUB_REPORTER_SUFFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class StubSink implements Sink {
+ public String getTableFromSubject(String subject) {
+ return KYLIN_PREFIX + "." + STUB_REPORTER_SUFFIX + "_" + subject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
----------------------------------------------------------------------
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
new file mode 100644
index 0000000..1336843
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import com.google.common.base.Strings;
+
+public enum TimePropertyEnum {
+ YEAR("KYEAR_BEGIN_DATE"), MONTH("KMONTH_BEGIN_DATE"), WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), DAY_DATE(
+ "KDAY_DATE"), DAY_TIME(
+ "KDAY_TIME"), TIME_HOUR("KTIME_HOUR"), TIME_MINUTE("KTIME_MINUTE"), TIME_SECOND("KTIME_SECOND");
+
+ private final String propertyName;
+
+ TimePropertyEnum(String propertyName) {
+ this.propertyName = propertyName;
+ }
+
+ public static TimePropertyEnum getByPropertyName(String propertyName) {
+ if (Strings.isNullOrEmpty(propertyName)) {
+ return null;
+ }
+ for (TimePropertyEnum property : TimePropertyEnum.values()) {
+ if (property.propertyName.equals(propertyName.toUpperCase())) {
+ return property;
+ }
+ }
+ return null;
+ }
+
+ public String toString() {
+ return propertyName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/metrics-reporter-hive/pom.xml
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/pom.xml b/metrics-reporter-hive/pom.xml
new file mode 100644
index 0000000..25ca08d
--- /dev/null
+++ b/metrics-reporter-hive/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-metrics-reporter-hive</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Kylin - Metrics Reporter Hive</name>
+ <description>Apache Kylin - Metrics Reporter Hive</description>
+
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-metrics</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
new file mode 100644
index 0000000..2ab0090
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord.RecordKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class HiveProducer {
+
+ private static final Logger logger = LoggerFactory.getLogger(HiveProducer.class);
+
+ private static final int CACHE_MAX_SIZE = 10;
+
+ private final HiveConf hiveConf;
+ private final FileSystem hdfs;
+ private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache;
+ private final String CONTENT_FILE_NAME;
+
+ public HiveProducer(Properties props) throws Exception {
+ hiveConf = new HiveConf();
+ hdfs = FileSystem.get(hiveConf);
+
+ for (Map.Entry<Object, Object> e : props.entrySet()) {
+ hiveConf.set(e.getKey().toString(), e.getValue().toString());
+ }
+
+ tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
+ @Override
+ public void onRemoval(RemovalNotification<Pair<String, String>, Pair<String, List<FieldSchema>>> notification) {
+ logger.info("Field schema with table " + ActiveReservoirReporter.getTableName(notification.getKey()) + " is removed due to " + notification.getCause());
+ }
+ }).maximumSize(CACHE_MAX_SIZE).build(new CacheLoader<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
+ @Override
+ public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName) throws Exception {
+ HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);
+ String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()).getSd().getLocation();
+ List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond());
+ metaStoreClient.close();
+ return new Pair(tableLocation, fields);
+ }
+ });
+
+ String hostName;
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ hostName = "UNKNOWN";
+ }
+ CONTENT_FILE_NAME = hostName + "-part-0000";
+ }
+
+ public void close() {
+ tableFieldSchemaCache.cleanUp();
+ }
+
+ public void send(final Record record) throws Exception {
+ HiveProducerRecord hiveRecord = convertTo(record);
+ write(hiveRecord.key(), Lists.newArrayList(hiveRecord));
+ }
+
+ public void send(final List<Record> recordList) throws Exception {
+ Map<RecordKey, List<HiveProducerRecord>> recordMap = Maps.newHashMap();
+ for (Record record : recordList) {
+ HiveProducerRecord hiveRecord = convertTo(record);
+ if (recordMap.get(hiveRecord.key()) == null) {
+ recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+ }
+ recordMap.get(hiveRecord.key()).add(hiveRecord);
+ }
+
+ for (Map.Entry<RecordKey, List<HiveProducerRecord>> entry : recordMap.entrySet()) {
+ write(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void write(RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception {
+ String tableLocation = tableFieldSchemaCache.get(new Pair(recordKey.database(), recordKey.table())).getFirst();
+ StringBuilder sb = new StringBuilder();
+ sb.append(tableLocation);
+ for (Map.Entry<String, String> e : recordKey.partition().entrySet()) {
+ sb.append("/");
+ sb.append(e.getKey().toLowerCase());
+ sb.append("=");
+ sb.append(e.getValue());
+ }
+ Path partitionPath = new Path(sb.toString());
+ if (!hdfs.exists(partitionPath)) {
+ StringBuilder hql = new StringBuilder();
+ hql.append("ALTER TABLE ");
+ hql.append(recordKey.database() + "." + recordKey.table());
+ hql.append(" ADD IF NOT EXISTS PARTITION (");
+ boolean ifFirst = true;
+ for (Map.Entry<String, String> e : recordKey.partition().entrySet()) {
+ if (ifFirst) {
+ ifFirst = false;
+ } else {
+ hql.append(",");
+ }
+ hql.append(e.getKey().toLowerCase());
+ hql.append("='" + e.getValue() + "'");
+ }
+ hql.append(")");
+ Driver driver = new Driver(hiveConf);
+ SessionState.start(new CliSessionState(hiveConf));
+ driver.run(hql.toString());
+ driver.close();
+ }
+ Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME);
+ if (!hdfs.exists(partitionContentPath)) {
+ int nRetry = 0;
+ while (!hdfs.createNewFile(partitionContentPath) && nRetry++ < 5) {
+ if (hdfs.exists(partitionContentPath)) {
+ break;
+ }
+ Thread.sleep(500L * nRetry);
+ }
+ if (!hdfs.exists(partitionContentPath)) {
+ throw new RuntimeException("Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
+ }
+ }
+ FSDataOutputStream fout = hdfs.append(partitionContentPath);
+ try {
+ for (HiveProducerRecord elem : recordItr) {
+ fout.writeBytes(elem.valueToString() + "\n");
+ }
+ } catch (IOException e) {
+ logger.error("Fails to write metrics to file " + partitionContentPath.toString());
+ } finally {
+ IOUtils.closeQuietly(fout);
+ }
+ }
+
+ private HiveProducerRecord convertTo(Record record) throws Exception {
+ Map<String, Object> rawValue = record.getValueRaw();
+
+ //Set partition values for hive table
+ Map<String, String> partitionKVs = Maps.newHashMapWithExpectedSize(1);
+ partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString());
+
+ return parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()), partitionKVs, rawValue);
+ }
+
+ public HiveProducerRecord parseToHiveProducerRecord(String tableName, Map<String, String> partitionKVs, Map<String, Object> rawValue) throws Exception {
+ Pair<String, String> tableNameSplits = ActiveReservoirReporter.getTableNameSplits(tableName);
+ List<FieldSchema> fields = tableFieldSchemaCache.get(tableNameSplits).getSecond();
+ List<Object> columnValues = Lists.newArrayListWithExpectedSize(fields.size());
+ for (FieldSchema fieldSchema : fields) {
+ columnValues.add(rawValue.get(fieldSchema.getName().toUpperCase()));
+ }
+
+ return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, columnValues);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/75bd5b61/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
new file mode 100644
index 0000000..8bf93ec
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Strings;
+
+public class HiveProducerRecord {
+
+ public static final String DELIMITER = ",";
+
+ private final RecordKey key;
+ private final List<Object> value;
+
+ public HiveProducerRecord(String dbName, String tableName, Map<String, String> partitionKVs, List<Object> value) {
+ this.key = new RecordKey(dbName, tableName, partitionKVs);
+ this.value = value;
+ }
+
+ public HiveProducerRecord(String tableName, Map<String, String> partitionKVs, List<Object> value) {
+ this.key = new RecordKey(tableName, partitionKVs);
+ this.value = value;
+ }
+
+ public HiveProducerRecord(String dbName, String tableName, List<Object> value) {
+ this.key = new RecordKey(dbName, tableName);
+ this.value = value;
+ }
+
+ public HiveProducerRecord(String tableName, List<Object> value) {
+ this.key = new RecordKey(tableName);
+ this.value = value;
+ }
+
+ public RecordKey key() {
+ return this.key;
+ }
+
+ public List<Object> value() {
+ return this.value;
+ }
+
+ public String toString() {
+ String value = this.value == null ? "null" : this.value.toString();
+ return "HiveProducerRecord(key=" + this.key.toString() + ", value=" + value + ")";
+ }
+
+ public String valueToString() {
+ if (this.value == null || value.isEmpty()) {
+ return null;
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < value.size() - 1; i++) {
+ sb.append(value.get(i) + DELIMITER);
+ }
+ sb.append(value.get(value.size() - 1));
+ return sb.toString();
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof HiveProducerRecord)) {
+ return false;
+ } else {
+ HiveProducerRecord that = (HiveProducerRecord) o;
+ if (this.key != null) {
+ if (!this.key.equals(that.key)) {
+ return false;
+ }
+ } else if (that.key != null) {
+ return false;
+ }
+ if (this.value != null) {
+ if (!this.value.equals(that.value)) {
+ return false;
+ }
+ } else if (that.value != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int hashCode() {
+ int result = this.key != null ? this.key.hashCode() : 0;
+ result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
+ return result;
+ }
+
+ public class RecordKey {
+ public static final String DEFAULT_DB_NAME = "DEFAULT";
+
+ private final String dbName;
+ private final String tableName;
+ private final Map<String, String> partitionKVs;
+
+ public RecordKey(String dbName, String tableName, Map<String, String> partitionKVs) {
+ if (Strings.isNullOrEmpty(dbName)) {
+ this.dbName = DEFAULT_DB_NAME;
+ } else {
+ this.dbName = dbName;
+ }
+ this.tableName = tableName;
+ this.partitionKVs = partitionKVs;
+ }
+
+ public RecordKey(String tableName, Map<String, String> partitionKVs) {
+ this(null, tableName, partitionKVs);
+ }
+
+ public RecordKey(String dbName, String tableName) {
+ this(dbName, tableName, null);
+ }
+
+ public RecordKey(String tableName) {
+ this(null, tableName, null);
+ }
+
+ public String database() {
+ return this.dbName;
+ }
+
+ public String table() {
+ return this.tableName;
+ }
+
+ public Map<String, String> partition() {
+ return this.partitionKVs;
+ }
+
+ public String toString() {
+ String partitionKVs = this.partitionKVs == null ? "null" : this.partitionKVs.toString();
+ return "RecordKey(database=" + this.dbName + ", table=" + this.tableName + ", partition=" + partitionKVs + ")";
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof RecordKey)) {
+ return false;
+ } else {
+ RecordKey that = (RecordKey) o;
+ if (this.dbName != null) {
+ if (!this.dbName.equals(that.dbName)) {
+ return false;
+ }
+ } else if (that.dbName != null) {
+ return false;
+ }
+
+ if (this.tableName != null) {
+ if (!this.tableName.equals(that.tableName)) {
+ return false;
+ }
+ } else if (that.tableName != null) {
+ return false;
+ }
+
+ if (this.partitionKVs != null) {
+ if (!this.partitionKVs.equals(that.partitionKVs)) {
+ return false;
+ }
+ } else if (that.partitionKVs != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public int hashCode() {
+ int result = this.dbName != null ? this.dbName.hashCode() : 0;
+ result = 31 * result + (this.tableName != null ? this.tableName.hashCode() : 0);
+ result = 31 * result + (this.partitionKVs != null ? this.partitionKVs.hashCode() : 0);
+ return result;
+ }
+ }
+}