You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/10/14 01:31:05 UTC

[hudi] branch master updated: [HUDI-1328] Introduce HoodieFlinkEngineContext to hudi-flink-client (#2161)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7d962e  [HUDI-1328] Introduce HoodieFlinkEngineContext to hudi-flink-client (#2161)
c7d962e is described below

commit c7d962efffa9440746d54dd2bcba73a9df8b354a
Author: wangxianghu <wx...@126.com>
AuthorDate: Wed Oct 14 09:30:49 2020 +0800

    [HUDI-1328] Introduce HoodieFlinkEngineContext to hudi-flink-client (#2161)
---
 hudi-client/hudi-flink-client/pom.xml              | 220 +++++++++++++++++++++
 .../client/common/HoodieFlinkEngineContext.java    |  89 +++++++++
 .../client/common/function/FunctionWrapper.java    |  73 +++++++
 .../src/main/resources/log4j.properties            |  23 +++
 .../common/TestHoodieFlinkEngineContext.java       | 115 +++++++++++
 .../src/test/resources/log4j-surefire.properties   |  31 +++
 hudi-client/pom.xml                                |   1 +
 7 files changed, 552 insertions(+)

diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
new file mode 100644
index 0000000..6fe6482
--- /dev/null
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -0,0 +1,220 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hudi-client</artifactId>
+    <groupId>org.apache.hudi</groupId>
+    <version>0.6.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>hudi-flink-client</artifactId>
+  <version>${parent.version}</version>
+
+  <name>hudi-flink-client</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <!-- Hudi  -->
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-client-common</artifactId>
+      <version>${parent.version}</version>
+    </dependency>
+
+    <!-- Parquet -->
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+    </dependency>
+
+    <!-- Hoodie - Test -->
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-client-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-hadoop-mr</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- HBase - Tests -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-mapper-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-core-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.xml.bind</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- Hive - Tests -->
+    <dependency>
+      <groupId>${hive.groupid}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+      <classifier>${hive.exec.classifier}</classifier>
+    </dependency>
+    <dependency>
+      <groupId>${hive.groupid}</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Test -->
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.vintage</groupId>
+      <artifactId>junit-vintage-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-runner</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-suite-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-commons</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>scala-compile-first</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>add-source</goal>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>scala-test-compile</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>compile</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+      <resource>
+        <directory>src/test/resources</directory>
+      </resource>
+    </resources>
+  </build>
+</project>
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
new file mode 100644
index 0000000..5c52b58
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.common.function.SerializableConsumer;
+import org.apache.hudi.client.common.function.SerializableFunction;
+import org.apache.hudi.client.common.function.SerializablePairFunction;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.util.Option;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * A flink engine implementation of HoodieEngineContext.
+ */
+public class HoodieFlinkEngineContext extends HoodieEngineContext {
+
+  public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
+    this(new SerializableConfiguration(new Configuration()), taskContextSupplier);
+  }
+
+  public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) {
+    super(hadoopConf, taskContextSupplier);
+  }
+
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList());
+  }
+
+  @Override
+  public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
+    return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
+  }
+
+  @Override
+  public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
+    data.forEach(throwingForeachWrapper(consumer));
+  }
+
+  @Override
+  public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
+    Map<K, V> map = new HashMap<>();
+    data.stream().map(throwingMapToPairWrapper(func)).forEach(x -> map.put(x._1, x._2));
+    return map;
+  }
+
+  @Override
+  public void setProperty(EngineProperty key, String value) {
+    // no operation for now
+  }
+
+  @Override
+  public Option<String> getProperty(EngineProperty key) {
+    // no operation for now
+    return Option.empty();
+  }
+
+  @Override
+  public void setJobStatus(String activeModule, String activityDescription) {
+    // no operation for now
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
new file mode 100644
index 0000000..31b11a0
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common.function;
+
+import org.apache.hudi.exception.HoodieException;
+import scala.Tuple2;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * Function wrapper util class, which catches the exception thrown by input function and return a similar function
+ * with no exception thrown.
+ */
+public class FunctionWrapper {
+
+  public static <I, O> Function<I, O> throwingMapWrapper(SerializableFunction<I, O> throwingMapFunction) {
+    return v1 -> {
+      try {
+        return throwingMapFunction.apply(v1);
+      } catch (Exception e) {
+        throw new HoodieException("Error occurs when executing map", e);
+      }
+    };
+  }
+
+  public static <I, O> Function<I, Stream<O>> throwingFlatMapWrapper(SerializableFunction<I, Stream<O>> throwingFlatMapFunction) {
+    return v1 -> {
+      try {
+        return throwingFlatMapFunction.apply(v1);
+      } catch (Exception e) {
+        throw new HoodieException("Error occurs when executing flatMap", e);
+      }
+    };
+  }
+
+  public static <I> Consumer<I> throwingForeachWrapper(SerializableConsumer<I> throwingConsumer) {
+    return v1 -> {
+      try {
+        throwingConsumer.accept(v1);
+      } catch (Exception e) {
+        throw new HoodieException("Error occurs when executing foreach", e);
+      }
+    };
+  }
+
+  public static <I, K, V> Function<I, Tuple2<K, V>> throwingMapToPairWrapper(SerializablePairFunction<I, K, V> throwingPairFunction) {
+    return v1 -> {
+      try {
+        return throwingPairFunction.call(v1);
+      } catch (Exception e) {
+        throw new HoodieException("Error occurs when executing mapToPair", e);
+      }
+    };
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/resources/log4j.properties b/hudi-client/hudi-flink-client/src/main/resources/log4j.properties
new file mode 100644
index 0000000..ff268fa
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=INFO, A1
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
new file mode 100644
index 0000000..5d752ad
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Unit test against HoodieFlinkEngineContext.
+ */
+public class TestHoodieFlinkEngineContext {
+  private HoodieFlinkEngineContext context;
+
+  @BeforeEach
+  public void init() {
+    context = new HoodieFlinkEngineContext(new DummyTaskContextSupplier());
+  }
+
+  @Test
+  public void testMap() {
+    List<Integer> mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    List<Integer> result = context.map(mapList, x -> x + 1, 2);
+    result.removeAll(mapList);
+
+    Assertions.assertEquals(1, result.size());
+    Assertions.assertEquals(11, result.get(0));
+  }
+
+  @Test
+  public void testFlatMap() {
+    List<String> list1 = Arrays.asList("a", "b", "c");
+    List<String> list2 = Arrays.asList("d", "e", "f");
+    List<String> list3 = Arrays.asList("g", "h", "i");
+
+    List<List<String>> inputList = new ArrayList<>();
+    inputList.add(list1);
+    inputList.add(list2);
+    inputList.add(list3);
+
+    List<String> result = context.flatMap(inputList, Collection::stream, 2);
+
+    Assertions.assertEquals(9, result.size());
+  }
+
+  @Test
+  public void testForeach() {
+    List<Integer> mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    List<Integer> result = new ArrayList<>(10);
+    context.foreach(mapList, result::add, 2);
+
+    Assertions.assertEquals(result.size(), mapList.size());
+    Assertions.assertTrue(result.containsAll(mapList));
+  }
+
+  @Test
+  public void testMapToPair() {
+    List<String> mapList = Arrays.asList("hudi_flink", "hudi_spark");
+
+    Map<String, String> resultMap = context.mapToPair(mapList, x -> {
+      String[] splits = x.split("_");
+      return Tuple2.apply(splits[0], splits[1]);
+    }, 2);
+
+    Assertions.assertNotNull(resultMap.get("hudi"));
+  }
+
+  public static class DummyTaskContextSupplier extends TaskContextSupplier {
+
+    @Override
+    public Supplier<Integer> getPartitionIdSupplier() {
+      return null;
+    }
+
+    @Override
+    public Supplier<Integer> getStageIdSupplier() {
+      return null;
+    }
+
+    @Override
+    public Supplier<Long> getAttemptIdSupplier() {
+      return null;
+    }
+
+    @Override
+    public Option<String> getProperty(EngineProperty prop) {
+      return null;
+    }
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties b/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties
new file mode 100644
index 0000000..32af462
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,31 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.hudi=DEBUG
+log4j.logger.org.apache.hadoop.hbase=ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 1ab0479..e8ff9e9 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -33,5 +33,6 @@
   <modules>
     <module>hudi-client-common</module>
     <module>hudi-spark-client</module>
+    <module>hudi-flink-client</module>
   </modules>
 </project>