You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by rm...@apache.org on 2016/06/21 08:37:26 UTC
[1/2] incubator-beam git commit: fixing build on windows
Repository: incubator-beam
Updated Branches:
refs/heads/BEAM-357_windows-build-fails [created] 460d21cb7
fixing build on windows
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41883300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41883300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41883300
Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 418833001fe6dd581f42f7fcc3c35ef36f292007
Parents: 0e4d0a9
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Sun Jun 19 21:19:57 2016 +0200
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Sun Jun 19 21:19:57 2016 +0200
----------------------------------------------------------------------
.../beam/runners/flink/WriteSinkITCase.java | 13 +
.../beam/runners/spark/SimpleWordCountTest.java | 8 +
.../beam/runners/spark/io/AvroPipelineTest.java | 7 +
.../beam/runners/spark/io/NumShardsTest.java | 7 +
.../io/hadoop/HadoopFileFormatPipelineTest.java | 7 +
.../translation/TransformTranslatorTest.java | 7 +
.../src/main/resources/beam/checkstyle.xml | 4 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 7 +-
.../beam/sdk/testing/HadoopWorkarounds.java | 129 +++++++++
sdks/java/io/hdfs/pom.xml | 9 +
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 264 ++++++++++---------
sdks/java/maven-archetypes/starter/pom.xml | 3 +
12 files changed, 334 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 36d3aef..1a56350 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
import java.io.File;
+import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
@@ -75,6 +76,18 @@ public class WriteSinkITCase extends JavaProgramTestBase {
p.run();
}
+
+ @Override
+ public void stopCluster() throws Exception {
+ try {
+ super.stopCluster();
+ } catch (final IOException ioe) {
+ if (ioe.getMessage().startsWith("Unable to delete file")) {
+ // that's ok for the test itself, just the OS playing with us on cleanup phase
+ }
+ }
+ }
+
/**
* Simple custom sink which writes to a file.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 2b4464d..4980995 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
@@ -40,11 +41,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -61,6 +64,11 @@ public class SimpleWordCountTest {
private static final Set<String> EXPECTED_COUNT_SET =
ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Test
public void testInMem() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index f358878..f6d0d55 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,6 +60,11 @@ public class AvroPipelineTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Before
public void setUp() throws IOException {
inputFile = tmpDir.newFile("test.avro");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 23d4592..8a864c4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
@@ -38,6 +39,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -64,6 +66,11 @@ public class NumShardsTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Before
public void setUp() throws IOException {
outputDir = tmpDir.newFolder("out");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index eaa508c..767682e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.spark.coders.WritableCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,6 +60,11 @@ public class HadoopFileFormatPipelineTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Before
public void setUp() throws IOException {
inputFile = tmpDir.newFile("test.seq");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index b593316..fec0dc9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -28,10 +28,12 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.Charsets;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -51,6 +53,11 @@ import java.util.List;
public class TransformTranslatorTest {
@Rule public TemporaryFolder tmp = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
* in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
index 311f599..457675a 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
@@ -29,7 +29,9 @@ page at http://checkstyle.sourceforge.net/config.html -->
<!-- Checks that there are no tab characters in the file. -->
</module>
- <module name="NewlineAtEndOfFile"/>
+ <module name="NewlineAtEndOfFile">
+ <property name="lineSeparator" value="lf" />
+ </module>
<module name="RegexpSingleline">
<!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 521f54b..045d6ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Ordering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
@@ -645,7 +646,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
private void copyOne(String source, String destination) throws IOException {
try {
// Copy the source file, replacing the existing destination.
- Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING);
+ // Paths.get(x) will not work on win cause of the ":" after the drive letter
+ Files.copy(
+ new File(source).toPath(),
+ new File(destination).toPath(),
+ StandardCopyOption.REPLACE_EXISTING);
} catch (NoSuchFileException e) {
LOG.debug("{} does not exist.", source);
// Suppress exception if file does not exist.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
new file mode 100644
index 0000000..ee2e135
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.commons.compress.utils.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * A simple class ensure winutils.exe can be found in the JVM.
+ */
+public class HadoopWorkarounds {
+ /**
+ * In practise this method only needs to be called once by JVM
+ * since hadoop uses static variables to store it.
+ *
+ * Note: ensure invocation is done before hadoop reads it
+ * and ensure this folder survives tests
+ * (avoid temporary folder usage since tests can share it).
+ *
+ * @param hadoopHome where to fake hadoop home.
+ */
+ public static void win(final File hadoopHome) {
+ // if (Shell.osType != Shell.OSType.OS_TYPE_WIN) { // don't do that to not load Shell yet
+ if (!System.getProperty("os.name", "").startsWith("Windows")
+ || System.getProperty("hadoop.home.dir") != null) {
+ return;
+ }
+
+ // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051
+ // so use this github repo temporarly then just use the main tar.gz
+ /*
+ String hadoopVersion = VersionInfo.getVersion();
+ final URL url = new URL("https://archive.apache.org/dist/hadoop/common/
+ hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz");
+ final File hadoopTar = tmpFolder.newFile();
+ try (final InputStream is = new GZIPInputStream(url.openStream());
+ final OutputStream os = new FileOutputStream(hadoopTar)) {
+ System.out.println("Downloading Hadoop in " + hadoopTar + ", " +
+ "this can take a while, if you have it locally " +
+ "maybe set \"hadoop.home.dir\" system property");
+ IOUtils.copyLarge(is, os, new byte[1024 * 1024]);
+ }
+
+ final File hadoopHome = tmpFolder.newFolder();
+ try (final ArchiveInputStream stream = new TarArchiveInputStream(
+ new FileInputStream(hadoopTar))) {
+ ArchiveEntry entry;
+ while ((entry = stream.getNextEntry()) != null) {
+ if (entry.isDirectory()) {
+ FileUtils.forceMkdir(new File(hadoopHome, entry.getName()));
+ continue;
+ }
+ final File out = new File(hadoopHome, entry.getName());
+ FileUtils.forceMkdir(out.getParentFile());
+ try (final OutputStream os = new FileOutputStream(out)) {
+ IOUtils.copy(stream, os);
+ }
+ }
+ }
+
+ final String hadoopRoot = "hadoop-" + hadoopVersion;
+ final File[] files = hadoopHome.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(final File pathname) {
+ return pathname.isDirectory() && pathname.getName().equals(hadoopRoot);
+ }
+ });
+ if (files == null || files.length != 1) {
+ throw new IllegalStateException("Didn't find hadoop in " + hadoopHome);
+ }
+ System.setProperty("hadoop.home.dir", files[0].getAbsolutePath());
+ */
+
+ System.out.println("You are on windows (sorry) and you don't set "
+ + "-Dhadoop.home.dir so we'll download winutils.exe");
+
+ new File(hadoopHome, "bin").mkdirs();
+ final URL url;
+ try {
+ url = new URL("https://github.com/steveloughran/winutils/"
+ + "raw/master/hadoop-2.7.1/bin/winutils.exe");
+ } catch (final MalformedURLException e) { // unlikely
+ throw new IllegalArgumentException(e);
+ }
+ try {
+ try (final InputStream is = url.openStream();
+ final OutputStream os = new FileOutputStream(
+ new File(hadoopHome, "bin/winutils.exe"))) {
+ try {
+ IOUtils.copy(is, os, 1024 * 1024);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath());
+ }
+
+ /**
+ * Just a convenient win(File) invocation for tests.
+ */
+ public static void winTests() {
+ win(new File("target/hadoop-win"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 9c30792..f8e3c14 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -83,5 +83,14 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- see HDFSFileSourceTest commented block
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.9</version>
+ <scope>test</scope>
+ </dependency>
+ -->
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index 67df7bc..2ce1af7 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.values.KV;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -53,138 +54,143 @@ import java.util.Random;
*/
public class HDFSFileSourceTest {
- Random random = new Random(0L);
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Test
- public void testFullyReadSingleFile() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
- File file = createFileWithData("tmp.seq", expectedResults);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testFullyReadFilePattern() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
- expectedResults.addAll(data1);
- expectedResults.addAll(data2);
- expectedResults.addAll(data3);
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testCloseUnstartedFilePatternReader() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
- // Closing an unstarted FilePatternReader should not throw an exception.
- try {
- reader.close();
- } catch (Exception e) {
- fail("Closing an unstarted FilePatternReader should not throw an exception");
+ Random random = new Random(0L);
+
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setUpOnWinWithMissingHadoopHome() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
+ @Test
+ public void testFullyReadSingleFile() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+ File file = createFileWithData("tmp.seq", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
}
- }
-
- @Test
- public void testSplits() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
-
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
- File file = createFileWithData("tmp.avro", expectedResults);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- // Assert that the source produces the expected records
- assertEquals(expectedResults, readFromSource(source, options));
-
- // Split with a small bundle size (has to be at least size of sync interval)
- List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
- .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
- assertTrue(splits.size() > 2);
- SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
- int nonEmptySplits = 0;
- for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
- if (readFromSource(subSource, options).size() > 0) {
- nonEmptySplits += 1;
- }
+
+ @Test
+ public void testFullyReadFilePattern() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+ expectedResults.addAll(data1);
+ expectedResults.addAll(data2);
+ expectedResults.addAll(data3);
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testCloseUnstartedFilePatternReader() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+ // Closing an unstarted FilePatternReader should not throw an exception.
+ try {
+ reader.close();
+ } catch (Exception e) {
+ fail("Closing an unstarted FilePatternReader should not throw an exception");
+ }
+ }
+
+ @Test
+ public void testSplits() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+ File file = createFileWithData("tmp.avro", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ // Assert that the source produces the expected records
+ assertEquals(expectedResults, readFromSource(source, options));
+
+ // Split with a small bundle size (has to be at least size of sync interval)
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+ .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+ assertTrue(splits.size() > 2);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ int nonEmptySplits = 0;
+ for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+ if (readFromSource(subSource, options).size() > 0) {
+ nonEmptySplits += 1;
+ }
+ }
+ assertTrue(nonEmptySplits > 2);
}
- assertTrue(nonEmptySplits > 2);
- }
-
- private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
- throws IOException {
- File tmpFile = tmpFolder.newFile(filename);
- try (Writer writer = SequenceFile.createWriter(new Configuration(),
- Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
- Writer.file(new Path(tmpFile.toURI())))) {
-
- for (KV<IntWritable, Text> record : records) {
- writer.append(record.getKey(), record.getValue());
- }
+
+ private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
+ throws IOException {
+ File tmpFile = tmpFolder.newFile(filename);
+ try (Writer writer = SequenceFile.createWriter(new Configuration(),
+ Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+ Writer.file(new Path(tmpFile.toURI())))) {
+
+ for (KV<IntWritable, Text> record : records) {
+ writer.append(record.getKey(), record.getValue());
+ }
+ }
+ return tmpFile;
}
- return tmpFile;
- }
-
- private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
- int numItems, int offset) {
- List<KV<IntWritable, Text>> records = new ArrayList<>();
- for (int i = 0; i < numItems; i++) {
- IntWritable key = new IntWritable(i + offset);
- Text value = new Text(createRandomString(dataItemLength));
- records.add(KV.of(key, value));
+
+ private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+ int numItems, int offset) {
+ List<KV<IntWritable, Text>> records = new ArrayList<>();
+ for (int i = 0; i < numItems; i++) {
+ IntWritable key = new IntWritable(i + offset);
+ Text value = new Text(createRandomString(dataItemLength));
+ records.add(KV.of(key, value));
+ }
+ return records;
}
- return records;
- }
-
- private String createRandomString(int length) {
- char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < length; i++) {
- builder.append(chars[random.nextInt(chars.length)]);
+
+ private String createRandomString(int length) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append(chars[random.nextInt(chars.length)]);
+ }
+ return builder.toString();
}
- return builder.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 5b6cb2a..9fb21e9 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -60,6 +60,9 @@
<goals>
<goal>integration-test</goal>
</goals>
+ <configuration>
+ <ignoreEOLStyle>true</ignoreEOLStyle> <!-- for win -->
+ </configuration>
</execution>
</executions>
</plugin>
[2/2] incubator-beam git commit: better comments for win workaround
and basic sanity checks for winutils.exe
Posted by rm...@apache.org.
better comments for win workaround and basic sanity checks for winutils.exe
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/460d21cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/460d21cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/460d21cb
Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 460d21cb7070603f789da9d13e12668194c91e9b
Parents: 4188330
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Tue Jun 21 10:37:05 2016 +0200
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Tue Jun 21 10:37:05 2016 +0200
----------------------------------------------------------------------
.../beam/runners/flink/WriteSinkITCase.java | 2 +-
.../beam/sdk/testing/HadoopWorkarounds.java | 109 +++++++++++++++++--
sdks/java/io/hdfs/pom.xml | 9 --
sdks/java/maven-archetypes/starter/pom.xml | 6 +-
4 files changed, 104 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 1a56350..bb3778d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -54,7 +54,7 @@ public class WriteSinkITCase extends JavaProgramTestBase {
@Override
protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+ resultPath = getTempDirPath("result-" + System.nanoTime());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
index ee2e135..1c2aa20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.testing;
+import static java.util.Arrays.asList;
+
import org.apache.commons.compress.utils.IOUtils;
import java.io.File;
@@ -26,15 +28,21 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.file.Files;
+import java.util.Arrays;
/**
* A simple class ensure winutils.exe can be found in the JVM.
+ * <p>
+ * See http://wiki.apache.org/hadoop/WindowsProblems for details.
+ * <p>
+ * Note: don't forget to add org.bouncycastle:bcpg-jdk16 dependency to use it.
*/
public class HadoopWorkarounds {
/**
* In practise this method only needs to be called once by JVM
* since hadoop uses static variables to store it.
- *
+ * <p>
* Note: ensure invocation is done before hadoop reads it
* and ensure this folder survives tests
* (avoid temporary folder usage since tests can share it).
@@ -51,6 +59,8 @@ public class HadoopWorkarounds {
// hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051
// so use this github repo temporarly then just use the main tar.gz
/*
+ // note this commented code requires commons-compress dependency (to add if we use that)
+
String hadoopVersion = VersionInfo.getVersion();
final URL url = new URL("https://archive.apache.org/dist/hadoop/common/
hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz");
@@ -97,19 +107,49 @@ public class HadoopWorkarounds {
+ "-Dhadoop.home.dir so we'll download winutils.exe");
new File(hadoopHome, "bin").mkdirs();
- final URL url;
- try {
- url = new URL("https://github.com/steveloughran/winutils/"
- + "raw/master/hadoop-2.7.1/bin/winutils.exe");
- } catch (final MalformedURLException e) { // unlikely
- throw new IllegalArgumentException(e);
+ final File winutils = new File(hadoopHome, "bin/winutils.exe");
+
+ for (final String suffix : asList("", ".asc")) {
+ final URL url;
+ try {
+ // this code is not a random URL - read HADOOP-10051
+ // it is provided and signed with an ASF gpg key.
+
+ // note: 2.6.3 cause 2.6.4, 2.7.1 don't have .asc
+ url = new URL("https://github.com/steveloughran/winutils/"
+ + "raw/master/hadoop-2.6.3/bin/winutils.exe" + suffix);
+ } catch (final MalformedURLException e) { // unlikely
+ throw new IllegalArgumentException(e);
+ }
+
+ // download winutils.exe
+ try {
+ try (final InputStream is = url.openStream();
+ final OutputStream os = new FileOutputStream(
+ new File(hadoopHome, "bin/winutils.exe" + suffix))) {
+ try {
+ IOUtils.copy(is, os, 1024 * 1024);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
}
+
+ // get the gpg key which is supposed to have signed the winutils.exe
+ final File gpg = new File(hadoopHome, "bin/gpg");
try {
- try (final InputStream is = url.openStream();
- final OutputStream os = new FileOutputStream(
- new File(hadoopHome, "bin/winutils.exe"))) {
+ /*
+ key is https://github.com/steveloughran/winutils/blob/master/KEYS
+ bu we trust the ASF not github so use the one we trust.
+ */
+ final URL gpgUrl = new URL("http://home.apache.org/keys/committer/stevel");
+ try (final InputStream is = gpgUrl.openStream();
+ final OutputStream os = new FileOutputStream(gpg)) {
try {
- IOUtils.copy(is, os, 1024 * 1024);
+ IOUtils.copy(is, os);
} catch (final IOException e) {
throw new IllegalStateException(e);
}
@@ -117,9 +157,56 @@ public class HadoopWorkarounds {
} catch (final IOException e) {
throw new IllegalStateException(e);
}
+
+ final File ascFile = new File(winutils.getParentFile(), winutils.getName() + ".asc");
+ try {
+ sanityCheck(winutils, ascFile, gpg);
+ } catch (IOException e) {
+ throw new IllegalStateException("Invalid download");
+ }
+
System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath());
}
+ // TODO: replace with gpg --verify?
+ // for now it is just some basic sanity checks to ensure we use the files we think
+ private static void sanityCheck(
+ final File winutils, final File ascFile, final File gpg)
+ throws IOException {
+
+ final byte[] asc = Files.readAllBytes(ascFile.toPath());
+ final byte[] expectedAsc = ("-----BEGIN PGP SIGNATURE-----\n"
+ + "Comment: GPGTools - https://gpgtools.org\n"
+ + "\n"
+ + "iQIcBAABCgAGBQJWeb5GAAoJEKkkVPkXR4a0qUgP/1u1Z5vV+IvU/8w79HIYX56+\n"
+ + "FHMRGxM5953dggqjhGSBtfx62YA8oxhDP+8qLpQWtfjTC3//CW1Oz5hrkL0m+Am5\n"
+ + "Kf+qiINDLqX3Fsc4wHQvnLMt2pJPmm4K9FtpkedCdAchLOiM6Wr7WtGiWYQAdUh0\n"
+ + "5FjUZLLVx95Kj3cTY+1B/BL+z/hB63Ry2AC29oZG4fCuAH1nTZjhH3vBD1/kzS+E\n"
+ + "LEKHrGh/pP6ADgg9AfJvVmRhidlCVi21ZfwWHAaitwDTMFvtFSGq03A3F6Xn2iyQ\n"
+ + "3H6RcZ8dqEbtUEa1jOh1xNGzqP4oipWe0KQJ/Lx2eiSh8te73k/Pfw1Ta9CuHXqk\n"
+ + "n8ko7cBc/pUm7nXbfjiURtWFJ4corT4oahJQna+GgvYR4BrYVLlSGb5VijTkzb7i\n"
+ + "0XU40BM5sOcDS/I0lkvqKP0mSi+mMJXbm10y0jw2S7KR7KeHLwzybsjco05DfWUD\n"
+ + "fSaCHK726g5SLsWJvZaurwna7+Mepzmo1HpAVy6nAuiAa2OQVIioNyFanIbuhbM3\n"
+ + "7PXBDWbfPOgr1WbYW4TASoepvsuJsAahYf2SlGagByOiDNliDHJi1z+ArfWsCFFh\n"
+ + "fAMMzPLKJwkmKPahyej3MrcywtntX68D7R8wTCAaj3xCxJsvX4IRv6YRk1+hQ2je\n"
+ + "EXQFW2c8nTI6XqtFpsbw\n"
+ + "=42+k\n"
+ + "-----END PGP SIGNATURE-----\n").getBytes("UTF-8");
+ if (!Arrays.equals(asc, expectedAsc)) {
+ throw new IllegalArgumentException(
+ "Invalid asc file, did the repo get corrupted?");
+ }
+
+ final byte[] exe = Files.readAllBytes(winutils.toPath());
+ if (exe.length != 108032 || exe[0] != 77
+ || exe[exe.length - 1] != 0 || exe[exe.length / 3] != -127) {
+ throw new IllegalArgumentException(
+ "Invalid winutils.exe file, did the repo get corrupted?");
+ }
+
+ // for now we ignore gpg cause it is useless until we can use gpg tools
+ }
+
/**
* Just a convenient win(File) invocation for tests.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index f8e3c14..9c30792 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -83,14 +83,5 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
- <!-- see HDFSFileSourceTest commented block
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.9</version>
- <scope>test</scope>
- </dependency>
- -->
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 9fb21e9..3d8267e 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -61,7 +61,11 @@
<goal>integration-test</goal>
</goals>
<configuration>
- <ignoreEOLStyle>true</ignoreEOLStyle> <!-- for win -->
+ <!--
+ For windows since project files use \n and win uses during the generation \r\n.
+ Since it doesn't change the validity of the generated files (java, xml) we are fine doing it.
+ -->
+ <ignoreEOLStyle>true</ignoreEOLStyle>
</configuration>
</execution>
</executions>