You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by rm...@apache.org on 2016/06/21 08:37:26 UTC

[1/2] incubator-beam git commit: fixing build on windows

Repository: incubator-beam
Updated Branches:
  refs/heads/BEAM-357_windows-build-fails [created] 460d21cb7


fixing build on windows


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41883300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41883300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41883300

Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 418833001fe6dd581f42f7fcc3c35ef36f292007
Parents: 0e4d0a9
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Sun Jun 19 21:19:57 2016 +0200
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Sun Jun 19 21:19:57 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/WriteSinkITCase.java     |  13 +
 .../beam/runners/spark/SimpleWordCountTest.java |   8 +
 .../beam/runners/spark/io/AvroPipelineTest.java |   7 +
 .../beam/runners/spark/io/NumShardsTest.java    |   7 +
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   7 +
 .../translation/TransformTranslatorTest.java    |   7 +
 .../src/main/resources/beam/checkstyle.xml      |   4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |   7 +-
 .../beam/sdk/testing/HadoopWorkarounds.java     | 129 +++++++++
 sdks/java/io/hdfs/pom.xml                       |   9 +
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 264 ++++++++++---------
 sdks/java/maven-archetypes/starter/pom.xml      |   3 +
 12 files changed, 334 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 36d3aef..1a56350 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URI;
 
@@ -75,6 +76,18 @@ public class WriteSinkITCase extends JavaProgramTestBase {
     p.run();
   }
 
+
+  @Override
+  public void stopCluster() throws Exception {
+    try {
+      super.stopCluster();
+    } catch (final IOException ioe) {
+      if (ioe.getMessage().startsWith("Unable to delete file")) {
+        // that's ok for the test itself, just the OS playing with us on cleanup phase
+      }
+    }
+  }
+
   /**
    * Simple custom sink which writes to a file.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 2b4464d..4980995 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -40,11 +41,13 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -61,6 +64,11 @@ public class SimpleWordCountTest {
   private static final Set<String> EXPECTED_COUNT_SET =
       ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
 
+  @BeforeClass
+  public static void initWin() throws IOException {
+    HadoopWorkarounds.winTests();
+  }
+
   @Test
   public void testInMem() throws Exception {
     SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index f358878..f6d0d55 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,6 +60,11 @@ public class AvroPipelineTest {
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
+  @BeforeClass
+  public static void initWin() throws IOException {
+    HadoopWorkarounds.winTests();
+  }
+
   @Before
   public void setUp() throws IOException {
     inputFile = tmpDir.newFile("test.avro");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 23d4592..8a864c4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,6 +39,7 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -64,6 +66,11 @@ public class NumShardsTest {
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
+  @BeforeClass
+  public static void initWin() throws IOException {
+    HadoopWorkarounds.winTests();
+  }
+
   @Before
   public void setUp() throws IOException {
     outputDir = tmpDir.newFolder("out");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index eaa508c..767682e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,6 +60,11 @@ public class HadoopFileFormatPipelineTest {
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
+  @BeforeClass
+  public static void initWin() throws IOException {
+    HadoopWorkarounds.winTests();
+  }
+
   @Before
   public void setUp() throws IOException {
     inputFile = tmpDir.newFile("test.seq");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index b593316..fec0dc9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -28,10 +28,12 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.base.Charsets;
 
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -51,6 +53,11 @@ import java.util.List;
 public class TransformTranslatorTest {
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
 
+  @BeforeClass
+  public static void initWin() throws IOException {
+    HadoopWorkarounds.winTests();
+  }
+
   /**
    * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
    * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
index 311f599..457675a 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
@@ -29,7 +29,9 @@ page at http://checkstyle.sourceforge.net/config.html -->
     <!-- Checks that there are no tab characters in the file. -->
   </module>
 
-  <module name="NewlineAtEndOfFile"/>
+  <module name="NewlineAtEndOfFile">
+    <property name="lineSeparator" value="lf" />
+  </module>
 
   <module name="RegexpSingleline">
     <!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 521f54b..045d6ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Ordering;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.channels.WritableByteChannel;
@@ -645,7 +646,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     private void copyOne(String source, String destination) throws IOException {
       try {
         // Copy the source file, replacing the existing destination.
-        Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING);
+        // Paths.get(x) will not work on win cause of the ":" after the drive letter
+        Files.copy(
+                new File(source).toPath(),
+                new File(destination).toPath(),
+                StandardCopyOption.REPLACE_EXISTING);
       } catch (NoSuchFileException e) {
         LOG.debug("{} does not exist.", source);
         // Suppress exception if file does not exist.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
new file mode 100644
index 0000000..ee2e135
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.commons.compress.utils.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * A simple class ensure winutils.exe can be found in the JVM.
+ */
+public class HadoopWorkarounds {
+    /**
+     * In practise this method only needs to be called once by JVM
+     * since hadoop uses static variables to store it.
+     *
+     * Note: ensure invocation is done before hadoop reads it
+     * and ensure this folder survives tests
+     * (avoid temporary folder usage since tests can share it).
+     *
+     * @param hadoopHome where to fake hadoop home.
+     */
+    public static void win(final File hadoopHome) {
+        // if (Shell.osType != Shell.OSType.OS_TYPE_WIN) { // don't do that to not load Shell yet
+        if (!System.getProperty("os.name", "").startsWith("Windows")
+                || System.getProperty("hadoop.home.dir") != null) {
+            return;
+        }
+
+        // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051
+        // so use this github repo temporarly then just use the main tar.gz
+        /*
+        String hadoopVersion = VersionInfo.getVersion();
+        final URL url = new URL("https://archive.apache.org/dist/hadoop/common/
+                  hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz");
+        final File hadoopTar = tmpFolder.newFile();
+        try (final InputStream is = new GZIPInputStream(url.openStream());
+             final OutputStream os = new FileOutputStream(hadoopTar)) {
+          System.out.println("Downloading Hadoop in " + hadoopTar + ", " +
+                  "this can take a while, if you have it locally " +
+                  "maybe set \"hadoop.home.dir\" system property");
+          IOUtils.copyLarge(is, os, new byte[1024 * 1024]);
+        }
+
+        final File hadoopHome = tmpFolder.newFolder();
+        try (final ArchiveInputStream stream = new TarArchiveInputStream(
+                new FileInputStream(hadoopTar))) {
+          ArchiveEntry entry;
+          while ((entry = stream.getNextEntry()) != null) {
+            if (entry.isDirectory()) {
+              FileUtils.forceMkdir(new File(hadoopHome, entry.getName()));
+              continue;
+            }
+            final File out = new File(hadoopHome, entry.getName());
+            FileUtils.forceMkdir(out.getParentFile());
+            try (final OutputStream os = new FileOutputStream(out)) {
+              IOUtils.copy(stream, os);
+            }
+          }
+        }
+
+        final String hadoopRoot = "hadoop-" + hadoopVersion;
+        final File[] files = hadoopHome.listFiles(new FileFilter() {
+          @Override
+          public boolean accept(final File pathname) {
+            return pathname.isDirectory() && pathname.getName().equals(hadoopRoot);
+          }
+        });
+        if (files == null || files.length != 1) {
+          throw new IllegalStateException("Didn't find hadoop in " + hadoopHome);
+        }
+        System.setProperty("hadoop.home.dir", files[0].getAbsolutePath());
+        */
+
+        System.out.println("You are on windows (sorry) and you don't set "
+                + "-Dhadoop.home.dir so we'll download winutils.exe");
+
+        new File(hadoopHome, "bin").mkdirs();
+        final URL url;
+        try {
+            url = new URL("https://github.com/steveloughran/winutils/"
+                    + "raw/master/hadoop-2.7.1/bin/winutils.exe");
+        } catch (final MalformedURLException e) { // unlikely
+            throw new IllegalArgumentException(e);
+        }
+        try {
+            try (final InputStream is = url.openStream();
+                 final OutputStream os = new FileOutputStream(
+                         new File(hadoopHome, "bin/winutils.exe"))) {
+                try {
+                    IOUtils.copy(is, os, 1024 * 1024);
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+        } catch (final IOException e) {
+            throw new IllegalStateException(e);
+        }
+        System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath());
+    }
+
+    /**
+     * Just a convenient win(File) invocation for tests.
+     */
+    public static void winTests() {
+        win(new File("target/hadoop-win"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 9c30792..f8e3c14 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -83,5 +83,14 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <!-- see HDFSFileSourceTest commented block
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.9</version>
+      <scope>test</scope>
+    </dependency>
+    -->
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index 67df7bc..2ce1af7 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.values.KV;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -53,138 +54,143 @@ import java.util.Random;
  */
 public class HDFSFileSourceTest {
 
-  Random random = new Random(0L);
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testFullyReadSingleFile() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadFilePattern() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
-    expectedResults.addAll(data1);
-    expectedResults.addAll(data2);
-    expectedResults.addAll(data3);
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
-  }
-
-  @Test
-  public void testCloseUnstartedFilePatternReader() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
-    // Closing an unstarted FilePatternReader should not throw an exception.
-    try {
-      reader.close();
-    } catch (Exception e) {
-      fail("Closing an unstarted FilePatternReader should not throw an exception");
+    Random random = new Random(0L);
+
+    @Rule
+    public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+    @BeforeClass
+    public static void setUpOnWinWithMissingHadoopHome() throws IOException {
+        HadoopWorkarounds.winTests();
+    }
+
+    @Test
+    public void testFullyReadSingleFile() throws Exception {
+        PipelineOptions options = PipelineOptionsFactory.create();
+        List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+        File file = createFileWithData("tmp.seq", expectedResults);
+
+        HDFSFileSource<IntWritable, Text> source =
+                HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+                        IntWritable.class, Text.class);
+
+        assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+        assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
     }
-  }
-
-  @Test
-  public void testSplits() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
-    File file = createFileWithData("tmp.avro", expectedResults);
-
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    // Assert that the source produces the expected records
-    assertEquals(expectedResults, readFromSource(source, options));
-
-    // Split with a small bundle size (has to be at least size of sync interval)
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
-        .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
-    assertTrue(splits.size() > 2);
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-    int nonEmptySplits = 0;
-    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
+
+    @Test
+    public void testFullyReadFilePattern() throws IOException {
+        PipelineOptions options = PipelineOptionsFactory.create();
+        List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+        File file1 = createFileWithData("file1", data1);
+
+        List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+        createFileWithData("file2", data2);
+
+        List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+        createFileWithData("file3", data3);
+
+        List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+        createFileWithData("otherfile", data4);
+
+        HDFSFileSource<IntWritable, Text> source =
+                HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+                        SequenceFileInputFormat.class, IntWritable.class, Text.class);
+        List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+        expectedResults.addAll(data1);
+        expectedResults.addAll(data2);
+        expectedResults.addAll(data3);
+        assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+    }
+
+    @Test
+    public void testCloseUnstartedFilePatternReader() throws IOException {
+        PipelineOptions options = PipelineOptionsFactory.create();
+        List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+        File file1 = createFileWithData("file1", data1);
+
+        List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+        createFileWithData("file2", data2);
+
+        List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+        createFileWithData("file3", data3);
+
+        List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+        createFileWithData("otherfile", data4);
+
+        HDFSFileSource<IntWritable, Text> source =
+                HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+                        SequenceFileInputFormat.class, IntWritable.class, Text.class);
+        Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+        // Closing an unstarted FilePatternReader should not throw an exception.
+        try {
+            reader.close();
+        } catch (Exception e) {
+            fail("Closing an unstarted FilePatternReader should not throw an exception");
+        }
+    }
+
+    @Test
+    public void testSplits() throws Exception {
+        PipelineOptions options = PipelineOptionsFactory.create();
+
+        List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+        File file = createFileWithData("tmp.avro", expectedResults);
+
+        HDFSFileSource<IntWritable, Text> source =
+                HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+                        IntWritable.class, Text.class);
+
+        // Assert that the source produces the expected records
+        assertEquals(expectedResults, readFromSource(source, options));
+
+        // Split with a small bundle size (has to be at least size of sync interval)
+        List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+                .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+        assertTrue(splits.size() > 2);
+        SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+        int nonEmptySplits = 0;
+        for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+            if (readFromSource(subSource, options).size() > 0) {
+                nonEmptySplits += 1;
+            }
+        }
+        assertTrue(nonEmptySplits > 2);
     }
-    assertTrue(nonEmptySplits > 2);
-  }
-
-  private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
-      throws IOException {
-    File tmpFile = tmpFolder.newFile(filename);
-    try (Writer writer = SequenceFile.createWriter(new Configuration(),
-          Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-          Writer.file(new Path(tmpFile.toURI())))) {
-
-      for (KV<IntWritable, Text> record : records) {
-        writer.append(record.getKey(), record.getValue());
-      }
+
+    private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
+            throws IOException {
+        File tmpFile = tmpFolder.newFile(filename);
+        try (Writer writer = SequenceFile.createWriter(new Configuration(),
+                Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+                Writer.file(new Path(tmpFile.toURI())))) {
+
+            for (KV<IntWritable, Text> record : records) {
+                writer.append(record.getKey(), record.getValue());
+            }
+        }
+        return tmpFile;
     }
-    return tmpFile;
-  }
-
-  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
-      int numItems, int offset) {
-    List<KV<IntWritable, Text>> records = new ArrayList<>();
-    for (int i = 0; i < numItems; i++) {
-      IntWritable key = new IntWritable(i + offset);
-      Text value = new Text(createRandomString(dataItemLength));
-      records.add(KV.of(key, value));
+
+    private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+                                                            int numItems, int offset) {
+        List<KV<IntWritable, Text>> records = new ArrayList<>();
+        for (int i = 0; i < numItems; i++) {
+            IntWritable key = new IntWritable(i + offset);
+            Text value = new Text(createRandomString(dataItemLength));
+            records.add(KV.of(key, value));
+        }
+        return records;
     }
-    return records;
-  }
-
-  private String createRandomString(int length) {
-    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      builder.append(chars[random.nextInt(chars.length)]);
+
+    private String createRandomString(int length) {
+        char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            builder.append(chars[random.nextInt(chars.length)]);
+        }
+        return builder.toString();
     }
-    return builder.toString();
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 5b6cb2a..9fb21e9 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -60,6 +60,9 @@
               <goals>
                 <goal>integration-test</goal>
               </goals>
+              <configuration>
+                <ignoreEOLStyle>true</ignoreEOLStyle> <!-- for win -->
+              </configuration>
             </execution>
           </executions>
         </plugin>


[2/2] incubator-beam git commit: better comments for win workaround and basic sanity checks for winutils.exe

Posted by rm...@apache.org.
better comments for win workaround and basic sanity checks for winutils.exe


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/460d21cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/460d21cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/460d21cb

Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 460d21cb7070603f789da9d13e12668194c91e9b
Parents: 4188330
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Tue Jun 21 10:37:05 2016 +0200
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Tue Jun 21 10:37:05 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/WriteSinkITCase.java     |   2 +-
 .../beam/sdk/testing/HadoopWorkarounds.java     | 109 +++++++++++++++++--
 sdks/java/io/hdfs/pom.xml                       |   9 --
 sdks/java/maven-archetypes/starter/pom.xml      |   6 +-
 4 files changed, 104 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 1a56350..bb3778d 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -54,7 +54,7 @@ public class WriteSinkITCase extends JavaProgramTestBase {
 
   @Override
   protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
+    resultPath = getTempDirPath("result-" + System.nanoTime());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
index ee2e135..1c2aa20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static java.util.Arrays.asList;
+
 import org.apache.commons.compress.utils.IOUtils;
 
 import java.io.File;
@@ -26,15 +28,21 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Files;
+import java.util.Arrays;
 
 /**
  * A simple class ensure winutils.exe can be found in the JVM.
+ * <p>
+ * See http://wiki.apache.org/hadoop/WindowsProblems for details.
+ * <p>
+ * Note: don't forget to add org.bouncycastle:bcpg-jdk16 dependency to use it.
  */
 public class HadoopWorkarounds {
     /**
      * In practise this method only needs to be called once by JVM
      * since hadoop uses static variables to store it.
-     *
+     * <p>
      * Note: ensure invocation is done before hadoop reads it
      * and ensure this folder survives tests
      * (avoid temporary folder usage since tests can share it).
@@ -51,6 +59,8 @@ public class HadoopWorkarounds {
         // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051
         // so use this github repo temporarly then just use the main tar.gz
         /*
+        // note this commented code requires commons-compress dependency (to add if we use that)
+
         String hadoopVersion = VersionInfo.getVersion();
         final URL url = new URL("https://archive.apache.org/dist/hadoop/common/
                   hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz");
@@ -97,19 +107,49 @@ public class HadoopWorkarounds {
                 + "-Dhadoop.home.dir so we'll download winutils.exe");
 
         new File(hadoopHome, "bin").mkdirs();
-        final URL url;
-        try {
-            url = new URL("https://github.com/steveloughran/winutils/"
-                    + "raw/master/hadoop-2.7.1/bin/winutils.exe");
-        } catch (final MalformedURLException e) { // unlikely
-            throw new IllegalArgumentException(e);
+        final File winutils = new File(hadoopHome, "bin/winutils.exe");
+
+        for (final String suffix : asList("", ".asc")) {
+            final URL url;
+            try {
+                // this code is not a random URL - read HADOOP-10051
+                // it is provided and signed with an ASF gpg key.
+
+                // note: 2.6.3 cause 2.6.4, 2.7.1 don't have .asc
+                url = new URL("https://github.com/steveloughran/winutils/"
+                        + "raw/master/hadoop-2.6.3/bin/winutils.exe" + suffix);
+            } catch (final MalformedURLException e) { // unlikely
+                throw new IllegalArgumentException(e);
+            }
+
+            // download winutils.exe
+            try {
+                try (final InputStream is = url.openStream();
+                     final OutputStream os = new FileOutputStream(
+                             new File(hadoopHome, "bin/winutils.exe" + suffix))) {
+                    try {
+                        IOUtils.copy(is, os, 1024 * 1024);
+                    } catch (final IOException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            } catch (final IOException e) {
+                throw new IllegalStateException(e);
+            }
         }
+
+        // get the gpg key which is supposed to have signed the winutils.exe
+        final File gpg = new File(hadoopHome, "bin/gpg");
         try {
-            try (final InputStream is = url.openStream();
-                 final OutputStream os = new FileOutputStream(
-                         new File(hadoopHome, "bin/winutils.exe"))) {
+            /*
+            key is https://github.com/steveloughran/winutils/blob/master/KEYS
+            bu we trust the ASF not github so use the one we trust.
+             */
+            final URL gpgUrl = new URL("http://home.apache.org/keys/committer/stevel");
+            try (final InputStream is = gpgUrl.openStream();
+                 final OutputStream os = new FileOutputStream(gpg)) {
                 try {
-                    IOUtils.copy(is, os, 1024 * 1024);
+                    IOUtils.copy(is, os);
                 } catch (final IOException e) {
                     throw new IllegalStateException(e);
                 }
@@ -117,9 +157,56 @@ public class HadoopWorkarounds {
         } catch (final IOException e) {
             throw new IllegalStateException(e);
         }
+
+        final File ascFile = new File(winutils.getParentFile(), winutils.getName() + ".asc");
+        try {
+            sanityCheck(winutils, ascFile, gpg);
+        } catch (IOException e) {
+            throw new IllegalStateException("Invalid download");
+        }
+
         System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath());
     }
 
+    // TODO: replace with gpg --verify?
+    // for now it is just some basic sanity checks to ensure we use the files we think
+    private static void sanityCheck(
+            final File winutils, final File ascFile, final File gpg)
+            throws IOException {
+
+        final byte[] asc = Files.readAllBytes(ascFile.toPath());
+        final byte[] expectedAsc = ("-----BEGIN PGP SIGNATURE-----\n"
+                + "Comment: GPGTools - https://gpgtools.org\n"
+                + "\n"
+                + "iQIcBAABCgAGBQJWeb5GAAoJEKkkVPkXR4a0qUgP/1u1Z5vV+IvU/8w79HIYX56+\n"
+                + "FHMRGxM5953dggqjhGSBtfx62YA8oxhDP+8qLpQWtfjTC3//CW1Oz5hrkL0m+Am5\n"
+                + "Kf+qiINDLqX3Fsc4wHQvnLMt2pJPmm4K9FtpkedCdAchLOiM6Wr7WtGiWYQAdUh0\n"
+                + "5FjUZLLVx95Kj3cTY+1B/BL+z/hB63Ry2AC29oZG4fCuAH1nTZjhH3vBD1/kzS+E\n"
+                + "LEKHrGh/pP6ADgg9AfJvVmRhidlCVi21ZfwWHAaitwDTMFvtFSGq03A3F6Xn2iyQ\n"
+                + "3H6RcZ8dqEbtUEa1jOh1xNGzqP4oipWe0KQJ/Lx2eiSh8te73k/Pfw1Ta9CuHXqk\n"
+                + "n8ko7cBc/pUm7nXbfjiURtWFJ4corT4oahJQna+GgvYR4BrYVLlSGb5VijTkzb7i\n"
+                + "0XU40BM5sOcDS/I0lkvqKP0mSi+mMJXbm10y0jw2S7KR7KeHLwzybsjco05DfWUD\n"
+                + "fSaCHK726g5SLsWJvZaurwna7+Mepzmo1HpAVy6nAuiAa2OQVIioNyFanIbuhbM3\n"
+                + "7PXBDWbfPOgr1WbYW4TASoepvsuJsAahYf2SlGagByOiDNliDHJi1z+ArfWsCFFh\n"
+                + "fAMMzPLKJwkmKPahyej3MrcywtntX68D7R8wTCAaj3xCxJsvX4IRv6YRk1+hQ2je\n"
+                + "EXQFW2c8nTI6XqtFpsbw\n"
+                + "=42+k\n"
+                + "-----END PGP SIGNATURE-----\n").getBytes("UTF-8");
+        if (!Arrays.equals(asc, expectedAsc)) {
+            throw new IllegalArgumentException(
+                    "Invalid asc file, did the repo get corrupted?");
+        }
+
+        final byte[] exe = Files.readAllBytes(winutils.toPath());
+        if (exe.length != 108032 || exe[0] != 77
+                || exe[exe.length - 1] != 0 || exe[exe.length / 3] != -127) {
+            throw new IllegalArgumentException(
+                    "Invalid winutils.exe file, did the repo get corrupted?");
+        }
+
+        // for now we ignore gpg cause it is useless until we can use gpg tools
+    }
+
     /**
      * Just a convenient win(File) invocation for tests.
      */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index f8e3c14..9c30792 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -83,14 +83,5 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-
-    <!-- see HDFSFileSourceTest commented block
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-compress</artifactId>
-      <version>1.9</version>
-      <scope>test</scope>
-    </dependency>
-    -->
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/460d21cb/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 9fb21e9..3d8267e 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -61,7 +61,11 @@
                 <goal>integration-test</goal>
               </goals>
               <configuration>
-                <ignoreEOLStyle>true</ignoreEOLStyle> <!-- for win -->
+                <!--
+                For windows since project files use \n and win uses during the generation \r\n.
+                Since it doesn't change the validity of the generated files (java, xml) we are fine doing it.
+                -->
+                <ignoreEOLStyle>true</ignoreEOLStyle>
               </configuration>
             </execution>
           </executions>