You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/04/21 11:35:52 UTC
flink git commit: [FLINK-3790] [streaming] Use proper hadoop config
in rolling sink
Repository: flink
Updated Branches:
refs/heads/master d636bf78e -> d45cb69af
[FLINK-3790] [streaming] Use proper hadoop config in rolling sink
Closes #1919
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45cb69a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45cb69a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45cb69a
Branch: refs/heads/master
Commit: d45cb69afd38e5dabf64f90e35a21e56a25ddd6f
Parents: d636bf7
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Apr 20 22:22:55 2016 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Apr 21 11:34:30 2016 +0200
----------------------------------------------------------------------
.../streaming/connectors/fs/RollingSink.java | 18 ++++++++++--------
.../connectors/fs/SequenceFileWriter.java | 8 +++++---
2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 76324d7..799e908 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -279,7 +280,9 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
* current part file path, the valid length of the in-progress files and pending part files.
*/
private transient BucketState bucketState;
-
+
+ private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
/**
* Creates a new {@code RollingSink} that writes files to the given base directory.
*
@@ -317,7 +320,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
bucketState = new BucketState();
}
- FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+ FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
refTruncate = reflectTruncate(fs);
// delete pending/in-progress files that might be left if we fail while
@@ -412,9 +416,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
private void openNewPartFile() throws Exception {
closeCurrentPartFile();
- org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-
- FileSystem fs = new Path(basePath).getFileSystem(conf);
+ FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
@@ -466,7 +468,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
if (currentPartPath != null) {
Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
- FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+ FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
fs.rename(inProgressPath, pendingPath);
LOG.debug("Moving in-progress bucket {} to pending file {}",
inProgressPath,
@@ -541,7 +543,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
Path pendingPath = new Path(finalPath.getParent(),
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
- FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+ FileSystem fs = pendingPath.getFileSystem(hadoopConf);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving pending file {} to final location after complete checkpoint {}.",
@@ -579,7 +581,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
bucketState.pendingFiles.clear();
FileSystem fs = null;
try {
- fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+ fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
} catch (IOException e) {
LOG.error("Error while creating FileSystem in checkpoint restore.", e);
throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 32b8d49..c71e97f 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -86,9 +87,11 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
}
CompressionCodec codec = null;
+
+ Configuration conf = HadoopFileSystem.getHadoopConfiguration();
if (!compressionCodecName.equals("None")) {
- CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration());
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
codec = codecFactory.getCodecByName(compressionCodecName);
if (codec == null) {
throw new RuntimeException("Codec " + compressionCodecName + " not found.");
@@ -96,7 +99,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
}
// the non-deprecated constructor syntax is only available in recent hadoop versions...
- writer = SequenceFile.createWriter(new Configuration(),
+ writer = SequenceFile.createWriter(conf,
getStream(),
keyClass,
valueClass,
@@ -119,7 +122,6 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
}
@Override
- @SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
if (!type.isTupleType()) {
throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");