You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/19 13:43:19 UTC
[3/4] kylin git commit: KYLIN-2722 Introduce a new measure for
dropwizard metrics framework, called active reservoir,
for actively pushing metrics to reporters
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
new file mode 100644
index 0000000..5af2bf9
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reporter which listens for new records and publishes them to hive.
+ */
+public class HiveReservoirReporter extends ActiveReservoirReporter {
+
+ public static final String HIVE_REPORTER_SUFFIX = "HIVE";
+ public static final HiveSink sink = new HiveSink();
+ protected static final Logger logger = LoggerFactory.getLogger(HiveReservoirReporter.class);
+ private final ActiveReservoir activeReservoir;
+ private final HiveReservoirListener listener;
+
+ public HiveReservoirReporter(ActiveReservoir activeReservoir, Properties props) throws Exception {
+ this.activeReservoir = activeReservoir;
+ this.listener = new HiveReservoirListener(props);
+ }
+
+ /**
+ * Returns a new {@link Builder} for {@link HiveReservoirReporter}.
+ *
+ * @param activeReservoir the registry to report
+ * @return a {@link Builder} instance for a {@link HiveReservoirReporter}
+ */
+ public static Builder forRegistry(ActiveReservoir activeReservoir) {
+ return new Builder(activeReservoir);
+ }
+
+ public static String getTableFromSubject(String subject) {
+ return sink.getTableFromSubject(subject);
+ }
+
+ /**
+ * Starts the reporter.
+ */
+ public void start() {
+ activeReservoir.addListener(listener);
+ }
+
+ /**
+ * Stops the reporter.
+ */
+ public void stop() {
+ activeReservoir.removeListener(listener);
+ }
+
+ /**
+ * Stops the reporter.
+ */
+ @Override
+ public void close() {
+ stop();
+ }
+
+ /**
+ * A builder for {@link HiveReservoirReporter} instances.
+ */
+ public static class Builder extends ReporterBuilder {
+
+ private Builder(ActiveReservoir activeReservoir) {
+ super(activeReservoir);
+ }
+
+ private void setFixedProperties() {
+ }
+
+ /**
+ * Builds a {@link HiveReservoirReporter} with the given properties.
+ *
+ * @return a {@link HiveReservoirReporter}
+ */
+ public HiveReservoirReporter build() throws Exception {
+ setFixedProperties();
+ return new HiveReservoirReporter(registry, props);
+ }
+ }
+
+ private class HiveReservoirListener implements ActiveReservoirListener {
+
+ HiveProducer producer;
+
+ private HiveReservoirListener(Properties props) throws Exception {
+ producer = new HiveProducer(props);
+ }
+
+ public boolean onRecordUpdate(final List<Record> records) {
+ try {
+ producer.send(records);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
+
+ public boolean onRecordUpdate(final Record record) {
+ try {
+ producer.send(record);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
+
+ public void close() {
+ producer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
new file mode 100644
index 0000000..3b0eefe
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+import static org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter.HIVE_REPORTER_SUFFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class HiveSink implements Sink {
+ public String getTableFromSubject(String subject) {
+ return KYLIN_PREFIX + "." + HIVE_REPORTER_SUFFIX + "_" + subject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
new file mode 100644
index 0000000..ae9fb88
--- /dev/null
+++ b/metrics-reporter-kafka/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-metrics-reporter-kafka</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Kylin - Metrics Reporter Kafka</name>
+ <description>Apache Kylin - Metrics Reporter Kafka</description>
+
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>2.3.0-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-metrics</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
new file mode 100644
index 0000000..311f3e3
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class KafkaActiveReserviorListener implements ActiveReservoirListener {
+
+ public static final long TOPIC_AVAILABLE_TAG = 0L;
+ protected static final Logger logger = LoggerFactory.getLogger(KafkaActiveReserviorListener.class);
+ protected Long maxBlockMs = 1800000L;
+ protected int maxRecordForLogNum = 10000;
+ protected int maxRecordSkipForLogNum = 10000;
+ protected ConcurrentHashMap<String, Long> topicsIfAvailable = new ConcurrentHashMap<>();
+ private int nRecord = 0;
+ private int nRecordSkip = 0;
+ private Callback produceCallback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ exception.printStackTrace();
+ return;
+ }
+ logger.info("topic:" + metadata.topic() + "; partition: " + metadata.partition() + "; offset: " + metadata.offset());
+ }
+ };
+
+ protected abstract String decorateTopic(String topic);
+
+ protected abstract void tryFetchMetadataFor(String topic);
+
+ protected abstract void send(String topic, Record record, Callback callback);
+
+ protected void sendWrapper(String topic, Record record, Callback callback) {
+ try {
+ send(topic, record, callback);
+ } catch (org.apache.kafka.common.errors.TimeoutException e) {
+ setUnAvailable(topic);
+ throw e;
+ }
+ }
+
+ public boolean onRecordUpdate(final List<Record> records) {
+ try {
+ for (Record record : records) {
+ String topic = decorateTopic(record.getType());
+ if (!checkAvailable(topic)) {
+ if (nRecordSkip % maxRecordSkipForLogNum == 0) {
+ nRecordSkip = 0;
+ logger.warn("Skip to send record to topic " + topic);
+ }
+ nRecordSkip++;
+ continue;
+ }
+ if (nRecord % maxRecordForLogNum == 0) {
+ nRecord = 0;
+ sendWrapper(topic, record, produceCallback);
+ } else {
+ sendWrapper(topic, record, null);
+ }
+ nRecord++;
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ return false;
+ }
+ return true;
+ }
+
+ protected boolean checkAvailable(String topic) {
+ Long timeBlock = topicsIfAvailable.get(topic);
+ if (timeBlock != null && timeBlock == TOPIC_AVAILABLE_TAG) {
+ return true;
+ } else if (timeBlock == null || System.currentTimeMillis() - timeBlock > maxBlockMs) {
+ try {
+ tryFetchMetadataFor(topic);
+ topicsIfAvailable.put(topic, TOPIC_AVAILABLE_TAG);
+ return true;
+ } catch (org.apache.kafka.common.errors.TimeoutException e) {
+ logger.warn("Fail to fetch metadata for topic " + topic);
+ setUnAvailable(topic);
+ return false;
+ }
+ }
+ return false;
+ }
+
+ protected void setUnAvailable(String topic) {
+ topicsIfAvailable.put(topic, System.currentTimeMillis());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
new file mode 100644
index 0000000..a5ea3aa
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reporter which listens for new records and publishes them to Kafka.
+ */
+public class KafkaReservoirReporter extends ActiveReservoirReporter {
+
+ public static final String KAFKA_REPORTER_SUFFIX = "KAFKA";
+ public static final KafkaSink sink = new KafkaSink();
+ protected static final Logger logger = LoggerFactory.getLogger(KafkaReservoirReporter.class);
+ private final ActiveReservoir activeReservoir;
+ private final KafkaReservoirListener listener;
+
+ private KafkaReservoirReporter(ActiveReservoir activeReservoir, Properties props) {
+ this.activeReservoir = activeReservoir;
+ this.listener = new KafkaReservoirListener(props);
+ }
+
+ /**
+ * Returns a new {@link Builder} for {@link KafkaReservoirReporter}.
+ *
+ * @param activeReservoir the registry to report
+ * @return a {@link Builder} instance for a {@link KafkaReservoirReporter}
+ */
+ public static Builder forRegistry(ActiveReservoir activeReservoir) {
+ return new Builder(activeReservoir);
+ }
+
+ private static String decorateTopic(String topic) {
+ return ActiveReservoirReporter.KYLIN_PREFIX + "_" + KAFKA_REPORTER_SUFFIX + "_" + topic;
+ }
+
+ public static String getTableFromSubject(String subject) {
+ return sink.getTableFromSubject(subject);
+ }
+
+ /**
+ * Starts the reporter.
+ */
+ public void start() {
+ activeReservoir.addListener(listener);
+ }
+
+ /**
+ * Stops the reporter.
+ */
+ public void stop() {
+ activeReservoir.removeListener(listener);
+ }
+
+ /**
+ * Stops the reporter.
+ */
+ @Override
+ public void close() {
+ stop();
+ }
+
+ /**
+ * A builder for {@link KafkaReservoirReporter} instances.
+ */
+ public static class Builder extends ReporterBuilder {
+
+ private Builder(ActiveReservoir activeReservoir) {
+ super(activeReservoir);
+ }
+
+ private void setFixedProperties() {
+ props.put("key.serializer", ByteArraySerializer.class.getName());
+ props.put("value.serializer", ByteArraySerializer.class.getName());
+ }
+
+ /**
+ * Builds a {@link KafkaReservoirReporter} with the given properties.
+ *
+ * @return a {@link KafkaReservoirReporter}
+ */
+ public KafkaReservoirReporter build() {
+ setFixedProperties();
+ return new KafkaReservoirReporter(registry, props);
+ }
+ }
+
+ private class KafkaReservoirListener extends KafkaActiveReserviorListener {
+ protected final Producer<byte[], byte[]> producer;
+
+ private KafkaReservoirListener(Properties props) {
+ producer = new KafkaProducer<>(props);
+ }
+
+ public void tryFetchMetadataFor(String topic) {
+ producer.partitionsFor(topic);
+ }
+
+ protected String decorateTopic(String topic) {
+ return KafkaReservoirReporter.decorateTopic(topic);
+ }
+
+ protected void send(String topic, Record record, Callback callback) {
+ producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback);
+ }
+
+ public void close() {
+ producer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
new file mode 100644
index 0000000..f756b8a
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class KafkaSink implements Sink {
+ public String getTableFromSubject(String subject) {
+ return KYLIN_PREFIX + "." + KafkaReservoirReporter.KAFKA_REPORTER_SUFFIX + "_" + subject;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index de5da05..ffbdf03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,21 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-metrics</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-metrics-reporter-hive</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-metrics-reporter-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-metadata</artifactId>
<version>${project.version}</version>
</dependency>
@@ -650,6 +665,13 @@
<version>${slf4j.version}</version>
</dependency>
+ <!-- Metrics -->
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+
<!-- Test -->
<dependency>
<groupId>junit</groupId>
@@ -1126,6 +1148,9 @@
<module>tool-assembly</module>
<module>kylin-it</module>
<module>tomcat-ext</module>
+ <module>core-metrics</module>
+ <module>metrics-reporter-hive</module>
+ <module>metrics-reporter-kafka</module>
</modules>
<reporting>