You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/08/11 22:50:56 UTC
[pulsar] branch master updated: [pulsar-io-hdfs2] Add config to
create subdirectory from current time (#7771)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 569b8f9 [pulsar-io-hdfs2] Add config to create subdirectory from current time (#7771)
569b8f9 is described below
commit 569b8f9fc31bb382a05b6139cd75ed67338028e7
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Aug 12 06:50:44 2020 +0800
[pulsar-io-hdfs2] Add config to create subdirectory from current time (#7771)
### Motivation
Adding a subdirectory associated with current time willmake it easier to process HDFS files in batch.
For example, user can create multiple running sink instances with `yyyy-MM-dd-hh` pattern. Then stop all instances at next hour. Eventually, files of the subdirectory will contain all messages consumed during this hour.
### Modifications
- Add a `subdirectoryPattern` field to `HdfsSinkConfig`
- Update some simple tests for `HdfsSinkConfig`
- Update the doc of HDFS2 sink
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (docs)
---
.../pulsar/io/hdfs2/sink/HdfsAbstractSink.java | 15 ++++++++++++-
.../pulsar/io/hdfs2/sink/HdfsSinkConfig.java | 26 ++++++++++++++++++----
.../pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java | 3 +++
pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml | 3 ++-
site2/docs/io-hdfs2-sink.md | 5 ++++-
5 files changed, 45 insertions(+), 7 deletions(-)
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
index dbc5881..1d2096d 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.io.hdfs2.sink;
import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,6 +42,7 @@ import org.apache.pulsar.io.hdfs2.HdfsResources;
* A Simple abstract class for HDFS sink.
* Users need to implement extractKeyValue function to use this sink.
*/
+@Slf4j
public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
protected HdfsSinkConfig hdfsSinkConfig;
@@ -46,6 +50,7 @@ public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector imple
protected HdfsSyncThread<V> syncThread;
private Path path;
private FSDataOutputStream hdfsStream;
+ private DateTimeFormatter subdirectoryFormatter;
public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
protected abstract void createWriter() throws IOException;
@@ -56,6 +61,9 @@ public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector imple
hdfsSinkConfig.validate();
connectorConfig = hdfsSinkConfig;
unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
+ if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
+ subdirectoryFormatter = DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
+ }
connectToHdfs();
createWriter();
launchSyncThread();
@@ -99,8 +107,13 @@ public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector imple
ext = getCompressionCodec().getDefaultExtension();
}
- path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+ String directory = hdfsSinkConfig.getDirectory();
+ if (subdirectoryFormatter != null) {
+ directory = FilenameUtils.concat(directory, LocalDateTime.now().format(subdirectoryFormatter));
+ }
+ path = new Path(FilenameUtils.concat(directory,
hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
+ log.info("Create path: {}", path);
}
return path;
}
diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
index fa52b6a..2af24fc 100644
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Map;
import lombok.Data;
@@ -73,6 +75,14 @@ public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable {
*/
private int maxPendingRecords = Integer.MAX_VALUE;
+ /**
+ * A subdirectory associated with the created time of the sink.
+ * The pattern is the formatted pattern of {@link AbstractHdfsConfig#getDirectory()}'s subdirectory.
+ *
+ * @see java.time.format.DateTimeFormatter for pattern's syntax
+ */
+ private String subdirectoryPattern;
+
public static HdfsSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
@@ -87,16 +97,24 @@ public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable {
public void validate() {
super.validate();
if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
- || StringUtils.isEmpty(filenamePrefix)) {
- throw new IllegalArgumentException("Required property not set.");
+ || StringUtils.isEmpty(filenamePrefix)) {
+ throw new IllegalArgumentException("Required property not set.");
}
if (syncInterval < 0) {
- throw new IllegalArgumentException("Sync Interval cannot be negative");
+ throw new IllegalArgumentException("Sync Interval cannot be negative");
}
if (maxPendingRecords < 1) {
- throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+ throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+ }
+
+ if (subdirectoryPattern != null) {
+ try {
+ LocalDateTime.of(2020, 1, 1, 12, 0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(subdirectoryPattern + " is not a valid pattern: " + e.getMessage());
+ }
}
}
}
diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
index d4e1f03..aa76064 100644
--- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
@@ -44,6 +44,7 @@ public class HdfsSinkConfigTests {
assertEquals("/foo/bar", config.getDirectory());
assertEquals("prefix", config.getFilenamePrefix());
assertEquals(Compression.SNAPPY, config.getCompression());
+ assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
}
@Test
@@ -53,6 +54,7 @@ public class HdfsSinkConfigTests {
map.put("directory", "/foo/bar");
map.put("filenamePrefix", "prefix");
map.put("compression", "SNAPPY");
+ map.put("subdirectoryPattern", "yy-MM-dd");
HdfsSinkConfig config = HdfsSinkConfig.load(map);
assertNotNull(config);
@@ -60,6 +62,7 @@ public class HdfsSinkConfigTests {
assertEquals("/foo/bar", config.getDirectory());
assertEquals("prefix", config.getFilenamePrefix());
assertEquals(Compression.SNAPPY, config.getCompression());
+ assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
}
@Test
diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
index 5a19ee0..47ab4f9 100644
--- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
+++ b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
@@ -21,5 +21,6 @@
"hdfsConfigResources": "core-site.xml",
"directory": "/foo/bar",
"filenamePrefix": "prefix",
-"compression": "SNAPPY"
+"compression": "SNAPPY",
+"subdirectoryPattern": "yyyy-MM-dd"
}
\ No newline at end of file
diff --git a/site2/docs/io-hdfs2-sink.md b/site2/docs/io-hdfs2-sink.md
index 56c4c7b..cbeb418 100644
--- a/site2/docs/io-hdfs2-sink.md
+++ b/site2/docs/io-hdfs2-sink.md
@@ -26,6 +26,7 @@ The configuration of the HDFS2 sink connector has the following properties.
| `separator` | char|false |None |The character used to separate records in a text file. <br/><br/>If no value is provided, the contents from all records are concatenated together in one continuous byte array. |
| `syncInterval` | long| false |0| The interval between calls to flush data to HDFS disk in milliseconds. |
| `maxPendingRecords` |int| false|Integer.MAX_VALUE | The maximum number of records that hold in memory before acking. <br/><br/>Setting this property to 1 makes every record send to disk before the record is acked.<br/><br/>Setting this property to a higher value allows buffering records before flushing them to disk.
+| `subdirectoryPattern` | String | false | None | A subdirectory associated with the created time of the sink.<br/>The pattern is the formatted pattern of `directory`'s subdirectory.<br/><br/>See [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) for pattern's syntax. |
### Example
@@ -39,7 +40,8 @@ Before using the HDFS2 sink connector, you need to create a configuration file t
"directory": "/foo/bar",
"filenamePrefix": "prefix",
"fileExtension": ".log",
- "compression": "SNAPPY"
+ "compression": "SNAPPY",
+ "subdirectoryPattern": "yyyy-MM-dd"
}
```
@@ -52,4 +54,5 @@ Before using the HDFS2 sink connector, you need to create a configuration file t
filenamePrefix: "prefix"
fileExtension: ".log"
compression: "SNAPPY"
+ subdirectoryPattern: "yyyy-MM-dd"
```