You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/04/21 11:35:52 UTC

flink git commit: [FLINK-3790] [streaming] Use proper hadoop config in rolling sink

Repository: flink
Updated Branches:
  refs/heads/master d636bf78e -> d45cb69af


[FLINK-3790] [streaming] Use proper hadoop config in rolling sink

Closes #1919


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45cb69a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45cb69a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45cb69a

Branch: refs/heads/master
Commit: d45cb69afd38e5dabf64f90e35a21e56a25ddd6f
Parents: d636bf7
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Apr 20 22:22:55 2016 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Apr 21 11:34:30 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java      | 18 ++++++++++--------
 .../connectors/fs/SequenceFileWriter.java         |  8 +++++---
 2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 76324d7..799e908 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -279,7 +280,9 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	 * current part file path, the valid length of the in-progress files and pending part files.
 	 */
 	private transient BucketState bucketState;
-
+	
+	private transient org.apache.hadoop.conf.Configuration hadoopConf;
+	
 	/**
 	 * Creates a new {@code RollingSink} that writes files to the given base directory.
 	 *
@@ -317,7 +320,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 			bucketState = new BucketState();
 		}
 
-		FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+		hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+		FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
 		refTruncate = reflectTruncate(fs);
 
 		// delete pending/in-progress files that might be left if we fail while
@@ -412,9 +416,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 	private void openNewPartFile() throws Exception {
 		closeCurrentPartFile();
 
-		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-
-		FileSystem fs = new Path(basePath).getFileSystem(conf);
+		FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
 
 		Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
 
@@ -466,7 +468,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		if (currentPartPath != null) {
 			Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
 			Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix);
-			FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+			FileSystem fs = inProgressPath.getFileSystem(hadoopConf);
 			fs.rename(inProgressPath, pendingPath);
 			LOG.debug("Moving in-progress bucket {} to pending file {}",
 					inProgressPath,
@@ -541,7 +543,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 						Path pendingPath = new Path(finalPath.getParent(),
 								pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
 
-						FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+						FileSystem fs = pendingPath.getFileSystem(hadoopConf);
 						fs.rename(pendingPath, finalPath);
 						LOG.debug(
 								"Moving pending file {} to final location after complete checkpoint {}.",
@@ -579,7 +581,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf
 		bucketState.pendingFiles.clear();
 		FileSystem fs = null;
 		try {
-			fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration());
+			fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
 		} catch (IOException e) {
 			LOG.error("Error while creating FileSystem in checkpoint restore.", e);
 			throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 32b8d49..c71e97f 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -86,9 +87,11 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
 		}
 
 		CompressionCodec codec = null;
+		
+		Configuration conf = HadoopFileSystem.getHadoopConfiguration();
 
 		if (!compressionCodecName.equals("None")) {
-			CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration());
+			CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
 			codec = codecFactory.getCodecByName(compressionCodecName);
 			if (codec == null) {
 				throw new RuntimeException("Codec " + compressionCodecName + " not found.");
@@ -96,7 +99,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
 		}
 
 		// the non-deprecated constructor syntax is only available in recent hadoop versions...
-		writer = SequenceFile.createWriter(new Configuration(),
+		writer = SequenceFile.createWriter(conf,
 				getStream(),
 				keyClass,
 				valueClass,
@@ -119,7 +122,6 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
 		if (!type.isTupleType()) {
 			throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");