You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:14 UTC
[kylin] 01/30: KYLIN-3846 Create Flink engine module and initialize
module structure
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1839a26e24b42fb96e265d7de7423dc74d64fdb0
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Mar 7 23:50:35 2019 +0800
KYLIN-3846 Create Flink engine module and initialize module structure
---
engine-flink/pom.xml | 145 +++++++++++++++++++++
.../org/apache/kylin/engine/flink/IFlinkInput.java | 60 +++++++++
.../apache/kylin/engine/flink/IFlinkOutput.java | 132 +++++++++++++++++++
pom.xml | 6 +
4 files changed, 343 insertions(+)
diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml
new file mode 100644
index 0000000..04ae57a
--- /dev/null
+++ b/engine-flink/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-engine-flink</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Kylin - Flink Engine</name>
+ <description>Apache Kylin - Flink Engine</description>
+
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>3.0.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <flink.version>1.7.1</flink.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-job</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-mr</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Flink dependency -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <!-- Hadoop dependency -->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-model</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.10</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.3.2</version>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkInput.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkInput.java
new file mode 100644
index 0000000..1c9dac0
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkInput.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+/**
+ * Flink engine (cubing & merge) input side interface.
+ */
+public interface IFlinkInput {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ IFlinkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+
+ /** Return a helper to participate in batch cubing merge job flow. */
+ IFlinkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+
+ /**
+ * Participate the batch cubing flow as the input side. Responsible for creating
+ * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+ * - Phase 3: Build Cube (with FlatTableInputFormat)
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ interface IFlinkBatchCubingInputSide {
+
+ /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+ void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does necessary clean up, like delete the intermediate flat table */
+ void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ interface IFlinkBatchMergeInputSide {
+
+ /** Add step that executes before merge dictionary and before merge cube. */
+ void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ }
+
+}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkOutput.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkOutput.java
new file mode 100644
index 0000000..15c0a73
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/IFlinkOutput.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import java.util.List;
+
+/**
+ * Flink engine (cubing & merge) output side interface.
+ */
+public interface IFlinkOutput {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ IFlinkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ interface IFlinkBatchCubingOutputSide {
+
+ /** Add step that executes after build dictionary and before build cube. */
+ void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+ /**
+ * Add step that saves cuboids from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ IFlinkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 2.
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ interface IFlinkBatchMergeOutputSide {
+
+ /** Add step that executes after merge dictionary and before merge cube. */
+ void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+
+ }
+
+ interface IFlinkMergeOutputFormat {
+
+ /** Configure the InputFormat of given job. */
+ void configureJobInput(Job job, String input) throws Exception;
+
+ /** Configure the OutputFormat of given job. */
+ void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception;
+
+ CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube);
+ }
+
+ IFlinkBatchOptimizeOutputSide getBatchOptimizeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
+ *
+ * - Phase 1: Filter Recommended Cuboid Data
+ * - Phase 2: Copy Dictionary & Calculate Statistics & Update Reused Cuboid Shard
+ * - Phase 3: Build Cube
+ * - Phase 4: Cleanup Optimize
+ * - Phase 5: Update Metadata & Cleanup
+ */
+ interface IFlinkBatchOptimizeOutputSide {
+
+ /** Create HTable based on recommended cuboids & statistics*/
+ void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow);
+
+ /** Build only missing cuboids*/
+ void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Cleanup intermediate cuboid data on HDFS*/
+ void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+ /** Invoked by Checkpoint job & Cleanup old segments' HTables and related working directory*/
+ void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 6f07d37..a3cd678 100644
--- a/pom.xml
+++ b/pom.xml
@@ -302,6 +302,11 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-flink</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-source-hive</artifactId>
<version>${project.version}</version>
</dependency>
@@ -1334,6 +1339,7 @@
<module>stream-coordinator</module>
<module>stream-core</module>
<module>stream-source-kafka</module>
+ <module>engine-flink</module>
</modules>
<reporting>