You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/10/19 13:43:19 UTC

[3/4] kylin git commit: KYLIN-2722 Introduce a new measure for dropwizard metrics framework, called active reservoir, for actively pushing metrics to reporters

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
new file mode 100644
index 0000000..5af2bf9
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reporter which listens for new records and publishes them to hive.
+ */
+public class HiveReservoirReporter extends ActiveReservoirReporter {
+
+    public static final String HIVE_REPORTER_SUFFIX = "HIVE";
+    public static final HiveSink sink = new HiveSink();
+    protected static final Logger logger = LoggerFactory.getLogger(HiveReservoirReporter.class);
+    private final ActiveReservoir activeReservoir;
+    private final HiveReservoirListener listener;
+
+    public HiveReservoirReporter(ActiveReservoir activeReservoir, Properties props) throws Exception {
+        this.activeReservoir = activeReservoir;
+        this.listener = new HiveReservoirListener(props);
+    }
+
+    /**
+     * Returns a new {@link Builder} for {@link HiveReservoirReporter}.
+     *
+     * @param activeReservoir the registry to report
+     * @return a {@link Builder} instance for a {@link HiveReservoirReporter}
+     */
+    public static Builder forRegistry(ActiveReservoir activeReservoir) {
+        return new Builder(activeReservoir);
+    }
+
+    public static String getTableFromSubject(String subject) {
+        return sink.getTableFromSubject(subject);
+    }
+
+    /**
+     * Starts the reporter.
+     */
+    public void start() {
+        activeReservoir.addListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    public void stop() {
+        activeReservoir.removeListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+
+    /**
+     * A builder for {@link HiveReservoirReporter} instances.
+     */
+    public static class Builder extends ReporterBuilder {
+
+        private Builder(ActiveReservoir activeReservoir) {
+            super(activeReservoir);
+        }
+
+        private void setFixedProperties() {
+        }
+
+        /**
+         * Builds a {@link HiveReservoirReporter} with the given properties.
+         *
+         * @return a {@link HiveReservoirReporter}
+         */
+        public HiveReservoirReporter build() throws Exception {
+            setFixedProperties();
+            return new HiveReservoirReporter(registry, props);
+        }
+    }
+
+    private class HiveReservoirListener implements ActiveReservoirListener {
+
+        HiveProducer producer;
+
+        private HiveReservoirListener(Properties props) throws Exception {
+            producer = new HiveProducer(props);
+        }
+
+        public boolean onRecordUpdate(final List<Record> records) {
+            try {
+                producer.send(records);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                return false;
+            }
+            return true;
+        }
+
+        public boolean onRecordUpdate(final Record record) {
+            try {
+                producer.send(record);
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+                return false;
+            }
+            return true;
+        }
+
+        public void close() {
+            producer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
new file mode 100644
index 0000000..3b0eefe
--- /dev/null
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveSink.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.hive;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+import static org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter.HIVE_REPORTER_SUFFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class HiveSink implements Sink {
+    public String getTableFromSubject(String subject) {
+        return KYLIN_PREFIX + "." + HIVE_REPORTER_SUFFIX + "_" + subject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
new file mode 100644
index 0000000..ae9fb88
--- /dev/null
+++ b/metrics-reporter-kafka/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-metrics-reporter-kafka</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Metrics Reporter Kafka</name>
+    <description>Apache Kylin - Metrics Reporter Kafka</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
new file mode 100644
index 0000000..311f3e3
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class KafkaActiveReserviorListener implements ActiveReservoirListener {
+
+    public static final long TOPIC_AVAILABLE_TAG = 0L;
+    protected static final Logger logger = LoggerFactory.getLogger(KafkaActiveReserviorListener.class);
+    protected Long maxBlockMs = 1800000L;
+    protected int maxRecordForLogNum = 10000;
+    protected int maxRecordSkipForLogNum = 10000;
+    protected ConcurrentHashMap<String, Long> topicsIfAvailable = new ConcurrentHashMap<>();
+    private int nRecord = 0;
+    private int nRecordSkip = 0;
+    private Callback produceCallback = new Callback() {
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception != null) {
+                exception.printStackTrace();
+                return;
+            }
+            logger.info("topic:" + metadata.topic() + "; partition: " + metadata.partition() + "; offset: " + metadata.offset());
+        }
+    };
+
+    protected abstract String decorateTopic(String topic);
+
+    protected abstract void tryFetchMetadataFor(String topic);
+
+    protected abstract void send(String topic, Record record, Callback callback);
+
+    protected void sendWrapper(String topic, Record record, Callback callback) {
+        try {
+            send(topic, record, callback);
+        } catch (org.apache.kafka.common.errors.TimeoutException e) {
+            setUnAvailable(topic);
+            throw e;
+        }
+    }
+
+    public boolean onRecordUpdate(final List<Record> records) {
+        try {
+            for (Record record : records) {
+                String topic = decorateTopic(record.getType());
+                if (!checkAvailable(topic)) {
+                    if (nRecordSkip % maxRecordSkipForLogNum == 0) {
+                        nRecordSkip = 0;
+                        logger.warn("Skip to send record to topic " + topic);
+                    }
+                    nRecordSkip++;
+                    continue;
+                }
+                if (nRecord % maxRecordForLogNum == 0) {
+                    nRecord = 0;
+                    sendWrapper(topic, record, produceCallback);
+                } else {
+                    sendWrapper(topic, record, null);
+                }
+                nRecord++;
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+    protected boolean checkAvailable(String topic) {
+        Long timeBlock = topicsIfAvailable.get(topic);
+        if (timeBlock != null && timeBlock == TOPIC_AVAILABLE_TAG) {
+            return true;
+        } else if (timeBlock == null || System.currentTimeMillis() - timeBlock > maxBlockMs) {
+            try {
+                tryFetchMetadataFor(topic);
+                topicsIfAvailable.put(topic, TOPIC_AVAILABLE_TAG);
+                return true;
+            } catch (org.apache.kafka.common.errors.TimeoutException e) {
+                logger.warn("Fail to fetch metadata for topic " + topic);
+                setUnAvailable(topic);
+                return false;
+            }
+        }
+        return false;
+    }
+
+    protected void setUnAvailable(String topic) {
+        topicsIfAvailable.put(topic, System.currentTimeMillis());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
new file mode 100644
index 0000000..a5ea3aa
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
+import org.apache.kylin.metrics.lib.Record;
+import org.apache.kylin.metrics.lib.impl.ReporterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reporter which listens for new records and publishes them to Kafka.
+ */
+public class KafkaReservoirReporter extends ActiveReservoirReporter {
+
+    public static final String KAFKA_REPORTER_SUFFIX = "KAFKA";
+    public static final KafkaSink sink = new KafkaSink();
+    protected static final Logger logger = LoggerFactory.getLogger(KafkaReservoirReporter.class);
+    private final ActiveReservoir activeReservoir;
+    private final KafkaReservoirListener listener;
+
+    private KafkaReservoirReporter(ActiveReservoir activeReservoir, Properties props) {
+        this.activeReservoir = activeReservoir;
+        this.listener = new KafkaReservoirListener(props);
+    }
+
+    /**
+     * Returns a new {@link Builder} for {@link KafkaReservoirReporter}.
+     *
+     * @param activeReservoir the registry to report
+     * @return a {@link Builder} instance for a {@link KafkaReservoirReporter}
+     */
+    public static Builder forRegistry(ActiveReservoir activeReservoir) {
+        return new Builder(activeReservoir);
+    }
+
+    private static String decorateTopic(String topic) {
+        return ActiveReservoirReporter.KYLIN_PREFIX + "_" + KAFKA_REPORTER_SUFFIX + "_" + topic;
+    }
+
+    public static String getTableFromSubject(String subject) {
+        return sink.getTableFromSubject(subject);
+    }
+
+    /**
+     * Starts the reporter.
+     */
+    public void start() {
+        activeReservoir.addListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    public void stop() {
+        activeReservoir.removeListener(listener);
+    }
+
+    /**
+     * Stops the reporter.
+     */
+    @Override
+    public void close() {
+        stop();
+    }
+
+    /**
+     * A builder for {@link KafkaReservoirReporter} instances.
+     */
+    public static class Builder extends ReporterBuilder {
+
+        private Builder(ActiveReservoir activeReservoir) {
+            super(activeReservoir);
+        }
+
+        private void setFixedProperties() {
+            props.put("key.serializer", ByteArraySerializer.class.getName());
+            props.put("value.serializer", ByteArraySerializer.class.getName());
+        }
+
+        /**
+         * Builds a {@link KafkaReservoirReporter} with the given properties.
+         *
+         * @return a {@link KafkaReservoirReporter}
+         */
+        public KafkaReservoirReporter build() {
+            setFixedProperties();
+            return new KafkaReservoirReporter(registry, props);
+        }
+    }
+
+    private class KafkaReservoirListener extends KafkaActiveReserviorListener {
+        protected final Producer<byte[], byte[]> producer;
+
+        private KafkaReservoirListener(Properties props) {
+            producer = new KafkaProducer<>(props);
+        }
+
+        public void tryFetchMetadataFor(String topic) {
+            producer.partitionsFor(topic);
+        }
+
+        protected String decorateTopic(String topic) {
+            return KafkaReservoirReporter.decorateTopic(topic);
+        }
+
+        protected void send(String topic, Record record, Callback callback) {
+            producer.send(new ProducerRecord<>(topic, record.getKey(), record.getValue()), callback);
+        }
+
+        public void close() {
+            producer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
new file mode 100644
index 0000000..f756b8a
--- /dev/null
+++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl.kafka;
+
+import static org.apache.kylin.metrics.lib.ActiveReservoirReporter.KYLIN_PREFIX;
+
+import org.apache.kylin.metrics.lib.Sink;
+
+public class KafkaSink implements Sink {
+    public String getTableFromSubject(String subject) {
+        return KYLIN_PREFIX + "." + KafkaReservoirReporter.KAFKA_REPORTER_SUFFIX + "_" + subject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/55629865/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index de5da05..ffbdf03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,6 +209,21 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-core-metrics</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-hive</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-metrics-reporter-kafka</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-core-metadata</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -650,6 +665,13 @@
                 <version>${slf4j.version}</version>
             </dependency>
 
+            <!-- Metrics -->
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-core</artifactId>
+                <version>${dropwizard.version}</version>
+            </dependency>
+
             <!-- Test -->
             <dependency>
                 <groupId>junit</groupId>
@@ -1126,6 +1148,9 @@
         <module>tool-assembly</module>
         <module>kylin-it</module>
         <module>tomcat-ext</module>
+        <module>core-metrics</module>
+        <module>metrics-reporter-hive</module>
+        <module>metrics-reporter-kafka</module>
     </modules>
 
     <reporting>