You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/04 01:57:44 UTC
[incubator-hudi] branch master updated: HUDI-644 kafka connect
checkpoint provider (#1453)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 575d87c HUDI-644 kafka connect checkpoint provider (#1453)
575d87c is described below
commit 575d87cf7d6f0f743cb7cec6520d80e6fcc3e20a
Author: YanJia-Gary-Li <ya...@gmail.com>
AuthorDate: Fri Apr 3 18:57:34 2020 -0700
HUDI-644 kafka connect checkpoint provider (#1453)
---
.../checkpointing/InitialCheckPointProvider.java | 31 +++++
.../checkpointing/KafkaConnectHdfsProvider.java | 152 +++++++++++++++++++++
.../TestKafkaConnectHdfsProvider.java | 94 +++++++++++++
3 files changed, 277 insertions(+)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
new file mode 100644
index 0000000..741b05c
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hudi.exception.HoodieException;
+
+/**
+ * Provide the initial checkpoint for delta streamer.
+ */
+public interface InitialCheckPointProvider {
+ /**
+ * Get checkpoint string recognizable for delta streamer.
+ */
+ String getCheckpoint() throws HoodieException;
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
new file mode 100644
index 0000000..f464f68
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Generate checkpoint from Kafka-Connect-HDFS managed data set.
+ * Documentation: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html
+ */
+public class KafkaConnectHdfsProvider implements InitialCheckPointProvider {
+ private final Path path;
+ private final FileSystem fs;
+
+ private static final String FILENAME_SEPARATOR = "[\\+\\.]";
+
+ public KafkaConnectHdfsProvider(final Path basePath, final FileSystem fileSystem) {
+ this.path = basePath;
+ this.fs = fileSystem;
+ }
+
+ /**
+ * PathFilter for Kafka-Connect-HDFS.
+ * Directory format: /partition1=xxx/partition2=xxx
+ * File format: topic+partition+lowerOffset+upperOffset.file
+ */
+ public static class KafkaConnectPathFilter implements PathFilter {
+ private static final Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*");
+ private static final Pattern PATTERN =
+ Pattern.compile("[a-zA-Z0-9\\._\\-]+\\+\\d+\\+\\d+\\+\\d+(.\\w+)?");
+
+ @Override
+ public boolean accept(final Path path) {
+ final String filename = path.getName();
+ final Matcher matcher = PATTERN.matcher(filename);
+ return matcher.matches();
+ }
+
+ public boolean acceptDir(final Path path) {
+ final String dirName = path.getName();
+ final Matcher matcher = DIRECTORY_PATTERN.matcher(dirName);
+ return matcher.matches();
+ }
+ }
+
+ /**
+ * Convert map contains max offset of each partition to string.
+ * @param topic Topic name
+ * @param checkpoint Map with partition as key and max offset as value
+ * @return Checkpoint string
+ */
+ private static String buildCheckpointStr(final String topic,
+ final HashMap<Integer, Integer> checkpoint) {
+ final StringBuilder checkpointStr = new StringBuilder();
+ checkpointStr.append(topic);
+ for (int i = 0; i < checkpoint.size(); ++i) {
+ checkpointStr.append(",").append(i).append(":").append(checkpoint.get(i));
+ }
+ return checkpointStr.toString();
+ }
+
+ /**
+ * List file status recursively.
+ * @param curPath Current Path
+ * @param filter PathFilter
+ * @return All file status match kafka connect naming convention
+ * @throws IOException
+ */
+ private ArrayList<FileStatus> listAllFileStatus(Path curPath,
+ KafkaConnectPathFilter filter) throws IOException {
+ ArrayList<FileStatus> allFileStatus = new ArrayList<>();
+ FileStatus[] fileStatus = this.fs.listStatus(curPath);
+ for (FileStatus status : fileStatus) {
+ if (status.isDirectory() && filter.acceptDir(status.getPath())) {
+ allFileStatus.addAll(listAllFileStatus(status.getPath(), filter));
+ } else {
+ if (filter.accept(status.getPath())) {
+ allFileStatus.add(status);
+ }
+ }
+ }
+ return allFileStatus;
+ }
+
+ @Override
+ public String getCheckpoint() throws HoodieException {
+ final KafkaConnectPathFilter filter = new KafkaConnectPathFilter();
+ ArrayList<FileStatus> fileStatus;
+ try {
+ fileStatus = listAllFileStatus(this.path, filter);
+ } catch (IOException e) {
+ throw new HoodieException(e.toString());
+ }
+ if (fileStatus.size() == 0) {
+ throw new HoodieException("No valid Kafka Connect Hdfs file found under:" + this.path.getName());
+ }
+ final String topic = fileStatus.get(0).getPath().getName().split(FILENAME_SEPARATOR)[0];
+ int maxPartition = -1;
+ final HashMap<Integer, Integer> checkpointMap = new HashMap<>();
+ for (final FileStatus status : fileStatus) {
+ final String filename = status.getPath().getName();
+ final String[] groups = filename.split(FILENAME_SEPARATOR);
+ final int partition = Integer.parseInt(groups[1]);
+ final int offsetUpper = Integer.parseInt(groups[3]);
+ maxPartition = Math.max(maxPartition, partition);
+ if (checkpointMap.containsKey(partition)) {
+ checkpointMap.put(partition, Math.max(checkpointMap.get(partition), offsetUpper));
+ } else {
+ checkpointMap.put(partition, offsetUpper);
+ }
+ }
+ if (checkpointMap.size() != maxPartition + 1) {
+ throw new HoodieException("Missing partition from the file scan, "
+ + "max partition found(start from 0): "
+ + maxPartition
+ + " total partitions number appear in "
+ + this.path.getName()
+ + " is: "
+ + checkpointMap.size()
+ + " total partitions number expected: "
+ + (maxPartition + 1));
+ }
+ return buildCheckpointStr(topic, checkpointMap);
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
new file mode 100644
index 0000000..fed8e46
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
+ private FileSystem fs = null;
+ private String topicPath = null;
+
+ @Before
+ public void init() {
+ // Prepare directories
+ initPath();
+ final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+ fs = FSUtils.getFs(basePath, hadoopConf);
+ }
+
+ @Test
+ public void testValidKafkaConnectPath() throws Exception {
+ // a standard format(time based partition) of the files managed by kafka connect is:
+ // topic/year=xxx/month=xxx/day=xxx/topic+partition+lowerOffset+upperOffset.file
+ topicPath = basePath + "/topic1";
+ new File(topicPath).mkdirs();
+ // create regular kafka connect hdfs dirs
+ new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
+ new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
+ // kafka connect tmp folder
+ new File(topicPath + "/TMP").mkdirs();
+ // tmp file that being written
+ new File(topicPath + "/TMP/" + "topic1+0+301+400.parquet").createNewFile();
+ // regular parquet files
+ new File(topicPath + "/year=2016/month=05/day=01/"
+ + "topic1+0+100+200.parquet").createNewFile();
+ new File(topicPath + "/year=2016/month=05/day=01/"
+ + "topic1+1+100+200.parquet").createNewFile();
+ new File(topicPath + "/year=2016/month=05/day=02/"
+ + "topic1+0+201+300.parquet").createNewFile();
+ // noise parquet file
+ new File(topicPath + "/year=2016/month=05/day=01/"
+ + "random_snappy_1.parquet").createNewFile();
+ new File(topicPath + "/year=2016/month=05/day=02/"
+ + "random_snappy_2.parquet").createNewFile();
+ InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
+ assertEquals(provider.getCheckpoint(), "topic1,0:300,1:200");
+ }
+
+ @Test(expected = HoodieException.class)
+ public void testMissingPartition() throws Exception {
+ topicPath = basePath + "/topic2";
+ new File(topicPath).mkdirs();
+ // create regular kafka connect hdfs dirs
+ new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
+ new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
+ // parquet files with missing partition
+ new File(topicPath + "/year=2016/month=05/day=01/"
+ + "topic1+0+100+200.parquet").createNewFile();
+ new File(topicPath + "/year=2016/month=05/day=01/"
+ + "topic1+2+100+200.parquet").createNewFile();
+ new File(topicPath + "/year=2016/month=05/day=02/"
+ + "topic1+0+201+300.parquet").createNewFile();
+ InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
+ provider.getCheckpoint();
+ }
+}