You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 15:15:27 UTC

[3/3] flink git commit: [core] Removed redundant common.io.FormatUtil

[core] Removed redundant common.io.FormatUtil


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d5d63f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d5d63f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d5d63f7

Branch: refs/heads/master
Commit: 3d5d63f7f80f3dfb353a86b10cfbed653b24421f
Parents: 61b1c0a
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Jul 12 19:42:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 15:14:41 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/io/FormatUtil.java  | 186 -------------------
 .../api/common/io/SerializedFormatTest.java     |  21 ++-
 2 files changed, 15 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d5d63f7/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
deleted file mode 100644
index 83860a5..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FormatUtil.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.io;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.util.ReflectionUtil;
-
-/**
- * Provides convenience methods to deal with I/O operations related to {@link InputFormat} and {@link OutputFormat}.
- */
-public class FormatUtil {
-
-
-	/**
-	 * Creates an {@link InputFormat} from a given class for the specified file. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param path
-	 *        the path of the file
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
-	 */
-	public static <T, F extends FileInputFormat<T>> F openInput(
-			Class<F> inputFormatClass, String path, Configuration configuration)
-		throws IOException
-	{
-		configuration = configuration == null ? new Configuration() : configuration;
-
-		Path normalizedPath = normalizePath(new Path(path));
-		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-
-		inputFormat.setFilePath(normalizedPath);
-		inputFormat.setOpenTimeout(0);
-		inputFormat.configure(configuration);
-
-		final FileSystem fs = FileSystem.get(normalizedPath.toUri());
-		FileStatus fileStatus = fs.getFileStatus(normalizedPath);
-
-		BlockLocation[] blocks = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-		inputFormat.open(new FileInputSplit(0, new Path(path), 0, fileStatus.getLen(), blocks[0].getHosts()));
-		return inputFormat;
-	}
-
-	/**
-	 * Creates {@link InputFormat}s from a given class for the specified file(s). The optional {@link Configuration}
-	 * initializes the formats.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param path
-	 *        the path of the file or to the directory containing the splits
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}s for each file in the specified path
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the files or initializing the InputFormat.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T, F extends FileInputFormat<T>> List<F> openAllInputs(
-			Class<F> inputFormatClass, String path, Configuration configuration) throws IOException {
-		Path nephelePath = new Path(path);
-		FileSystem fs = nephelePath.getFileSystem();
-		FileStatus fileStatus = fs.getFileStatus(nephelePath);
-		if (!fileStatus.isDir()) {
-			return Arrays.asList(openInput(inputFormatClass, path, configuration));
-		}
-		FileStatus[] list = fs.listStatus(nephelePath);
-		List<F> formats = new ArrayList<F>();
-		for (int index = 0; index < list.length; index++) {
-			formats.add(openInput(inputFormatClass, list[index].getPath().toString(), configuration));
-		}
-		return formats;
-	}
-
-	/**
-	 * Creates an {@link InputFormat} from a given class. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the InputFormat
-	 * @param inputFormatClass
-	 *        the class of the InputFormat
-	 * @param configuration
-	 *        optional configuration of the InputFormat
-	 * @return the created {@link InputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the InputFormat.
-	 */
-	public static <T, IS extends InputSplit, F extends InputFormat<T, IS>> F openInput(
-			Class<F> inputFormatClass, Configuration configuration) throws IOException {
-		configuration = configuration == null ? new Configuration() : configuration;
-
-		final F inputFormat = ReflectionUtil.newInstance(inputFormatClass);
-		inputFormat.configure(configuration);
-		final IS[] splits = inputFormat.createInputSplits(1);
-		inputFormat.open(splits[0]);
-		return inputFormat;
-	}
-	
-	/**
-	 * Creates an {@link OutputFormat} from a given class for the specified file. The optional {@link Configuration}
-	 * initializes the format.
-	 * 
-	 * @param <T>
-	 *        the class of the OutputFormat
-	 * @param outputFormatClass
-	 *        the class of the OutputFormat
-	 * @param path
-	 *        the path of the file or to the directory containing the splits
-	 * @param configuration
-	 *        optional configuration of the OutputFormat
-	 * @return the created {@link OutputFormat}
-	 * @throws IOException
-	 *         if an I/O error occurred while accessing the file or initializing the OutputFormat.
-	 */
-	public static <T, F extends FileOutputFormat<? extends T>> F openOutput(
-			Class<F> outputFormatClass, String path, Configuration configuration) 
-		throws IOException
-	{
-		final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
-		outputFormat.setOutputFilePath(new Path(path));
-		outputFormat.setWriteMode(WriteMode.OVERWRITE);
-	
-		configuration = configuration == null ? new Configuration() : configuration;
-		
-		outputFormat.configure(configuration);
-		outputFormat.open(0, 1);
-		return outputFormat;
-	}
-
-	/**
-	 * Fixes the path if it denotes a local (relative) file without the proper protocol prefix.
-	 */
-	private static Path normalizePath(Path path) {
-		URI uri = path.toUri();
-		if (uri.getScheme() == null) {
-			try {
-				uri = new URI("file", uri.getHost(), uri.getPath(), uri.getFragment());
-				path = new Path(uri.toString());
-			} catch (URISyntaxException e) {
-				throw new IllegalArgumentException("path is invalid", e);
-			}
-		}
-		return path;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3d5d63f7/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
index e421f4f..d82623b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -19,10 +19,13 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -56,13 +59,19 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
 		return inputFormat;
 	}
 
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
+	
 	@Override
-	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
-			configuration) throws IOException {
-		return FormatUtil.<Record, SerializedOutputFormat>openOutput
-				(SerializedOutputFormat.class, path, configuration);
+	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration configuration)
+			throws IOException
+	{
+		final SerializedOutputFormat<Record> outputFormat = new SerializedOutputFormat<Record>();
+		outputFormat.setOutputFilePath(new Path(path));
+		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+		configuration = configuration == null ? new Configuration() : configuration;
+		outputFormat.configure(configuration);
+		outputFormat.open(0, 1);
+		return outputFormat;
 	}
 
 	@Override