You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:14 UTC

[kylin] 01/30: KYLIN-3846 Create Flink engine module and initialize module structure

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1839a26e24b42fb96e265d7de7423dc74d64fdb0
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Mar 7 23:50:35 2019 +0800

    KYLIN-3846 Create Flink engine module and initialize module structure
---
 engine-flink/pom.xml                               | 145 +++++++++++++++++++++
 .../org/apache/kylin/engine/flink/IFlinkInput.java |  60 +++++++++
 .../apache/kylin/engine/flink/IFlinkOutput.java    | 132 +++++++++++++++++++
 pom.xml                                            |   6 +
 4 files changed, 343 insertions(+)

diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml
new file mode 100644
index 0000000..04ae57a
--- /dev/null
+++ b/engine-flink/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-engine-flink</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Flink Engine</name>
+    <description>Apache Kylin - Flink Engine</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>3.0.0-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+        <flink.version>1.7.1</flink.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-job</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-mr</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Flink dependency -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <!-- Hadoop dependency -->
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-model</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+            <version>0.9.10</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.3.2</version>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkInput.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkInput.java
new file mode 100644
index 0000000..1c9dac0
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkInput.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+/**
+ * Flink engine (cubing & merge) input side interface.
+ */
+public interface IFlinkInput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    IFlinkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+
+    /** Return a helper to participate in batch cubing merge job flow. */
+    IFlinkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+
+    /**
+     * Participate the batch cubing flow as the input side. Responsible for creating
+     * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
+     *
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+     * - Phase 3: Build Cube (with FlatTableInputFormat)
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    interface IFlinkBatchCubingInputSide {
+
+        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+        void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does necessary clean up, like delete the intermediate flat table */
+        void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    interface IFlinkBatchMergeInputSide {
+
+        /** Add step that executes before merge dictionary and before merge cube. */
+        void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+    }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkOutput.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkOutput.java
new file mode 100644
index 0000000..15c0a73
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkOutput.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import java.util.List;
+
+/**
+ * Flink engine (cubing & merge) output side interface.
+ */
+public interface IFlinkOutput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    IFlinkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
+     *
+     * - Phase 1: Create Flat Table
+     * - Phase 2: Build Dictionary
+     * - Phase 3: Build Cube
+     * - Phase 4: Update Metadata & Cleanup
+     */
+    interface IFlinkBatchCubingOutputSide {
+
+        /** Add step that executes after build dictionary and before build cube. */
+        void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+        /**
+         * Add step that saves cuboids from HDFS to storage.
+         *
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+    }
+
+    /** Return a helper to participate in batch merge job flow. */
+    IFlinkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 2.
+     *
+     * - Phase 1: Merge Dictionary
+     * - Phase 2: Merge Cube
+     * - Phase 3: Update Metadata & Cleanup
+     */
+    interface IFlinkBatchMergeOutputSide {
+
+        /** Add step that executes after merge dictionary and before merge cube. */
+        void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+        /**
+         * Add step that saves cuboid output from HDFS to storage.
+         *
+         * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+         * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+         * dictionary encoding; Mx is measure value serialization form.
+         */
+        void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow);
+
+        /** Add step that does any necessary clean up. */
+        void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+
+    }
+
+    interface IFlinkMergeOutputFormat {
+
+        /** Configure the InputFormat of given job. */
+        void configureJobInput(Job job, String input) throws Exception;
+
+        /** Configure the OutputFormat of given job. */
+        void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception;
+
+        CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube);
+    }
+
+    IFlinkBatchOptimizeOutputSide getBatchOptimizeOutputSide(CubeSegment seg);
+
+    /**
+     * Participate the batch cubing flow as the output side. Responsible for saving
+     * the cuboid output to storage at the end of Phase 3.
+     *
+     * - Phase 1: Filter Recommended Cuboid Data
+     * - Phase 2: Copy Dictionary & Calculate Statistics & Update Reused Cuboid Shard
+     * - Phase 3: Build Cube
+     * - Phase 4: Cleanup Optimize
+     * - Phase 5: Update Metadata & Cleanup
+     */
+    interface IFlinkBatchOptimizeOutputSide {
+
+        /** Create HTable based on recommended cuboids & statistics*/
+        void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow);
+
+        /** Build only missing cuboids*/
+        void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+        /** Cleanup intermediate cuboid data on HDFS*/
+        void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+        /** Invoked by Checkpoint job & Cleanup old segments' HTables and related working directory*/
+        void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 6f07d37..a3cd678 100644
--- a/pom.xml
+++ b/pom.xml
@@ -302,6 +302,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-engine-flink</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-source-hive</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -1334,6 +1339,7 @@
     <module>stream-coordinator</module>
     <module>stream-core</module>
     <module>stream-source-kafka</module>
+    <module>engine-flink</module>
   </modules>
 
   <reporting>