You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/04 12:04:37 UTC

[1/2] beam git commit: [BEAM-1875] Remove Spark-runner-custom Hadoop and Avro IOs

Repository: beam
Updated Branches:
  refs/heads/master cc8e0b9df -> 45f63eb6c


[BEAM-1875] Remove Spark-runner-custom Hadoop and Avro IOs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f7defdb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f7defdb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f7defdb

Branch: refs/heads/master
Commit: 6f7defdbb1aa14bf5577bf24ba1d3307cd2fc8b9
Parents: cc8e0b9
Author: Amit Sela <am...@gmail.com>
Authored: Tue Apr 4 10:31:31 2017 +0300
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 13:58:40 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  19 --
 .../beam/runners/spark/io/hadoop/HadoopIO.java  | 216 ----------------
 .../spark/io/hadoop/ShardNameBuilder.java       | 111 --------
 .../spark/io/hadoop/ShardNameTemplateAware.java |  31 ---
 .../io/hadoop/ShardNameTemplateHelper.java      |  63 -----
 .../io/hadoop/TemplatedAvroKeyOutputFormat.java |  45 ----
 .../TemplatedSequenceFileOutputFormat.java      |  45 ----
 .../io/hadoop/TemplatedTextOutputFormat.java    |  45 ----
 .../runners/spark/io/hadoop/package-info.java   |  22 --
 .../spark/translation/TransformTranslator.java  | 251 -------------------
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 121 ---------
 .../spark/io/hadoop/ShardNameBuilderTest.java   |  88 -------
 runners/spark/src/test/resources/test_text.txt  |   2 -
 13 files changed, 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 96480cd..dd174bf 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -248,19 +248,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <version>${avro.version}</version>
-      <classifier>hadoop2</classifier>
-      <exclusions>
-        <!-- exclude old Jetty version of servlet API -->
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>servlet-api</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
       <version>${dropwizard.metrics.version}</version>
@@ -318,12 +305,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
deleted file mode 100644
index f2457ce..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.io.ShardNameTemplate;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
- * Spark native HadoopIO.
- */
-public final class HadoopIO {
-
-  private HadoopIO() {
-  }
-
-  /**
-   * Read operation from HDFS.
-   */
-  public static final class Read {
-
-    private Read() {
-    }
-
-    public static <K, V> Bound<K, V> from(String filepattern,
-        Class<? extends FileInputFormat<K, V>> format, Class<K> key, Class<V> value) {
-      return new Bound<>(filepattern, format, key, value);
-    }
-
-    /**
-     * A {@link PTransform} reading bounded collection of data from HDFS.
-     * @param <K> the type of the keys
-     * @param <V> the type of the values
-     */
-    public static class Bound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
-
-      private final String filepattern;
-      private final Class<? extends FileInputFormat<K, V>> formatClass;
-      private final Class<K> keyClass;
-      private final Class<V> valueClass;
-
-      Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key,
-          Class<V> value) {
-        checkNotNull(filepattern, "need to set the filepattern of an HadoopIO.Read transform");
-        checkNotNull(format, "need to set the format class of an HadoopIO.Read transform");
-        checkNotNull(key, "need to set the key class of an HadoopIO.Read transform");
-        checkNotNull(value, "need to set the value class of an HadoopIO.Read transform");
-        this.filepattern = filepattern;
-        this.formatClass = format;
-        this.keyClass = key;
-        this.valueClass = value;
-      }
-
-      public String getFilepattern() {
-        return filepattern;
-      }
-
-      public Class<? extends FileInputFormat<K, V>> getFormatClass() {
-        return formatClass;
-      }
-
-      public Class<V> getValueClass() {
-        return valueClass;
-      }
-
-      public Class<K> getKeyClass() {
-        return keyClass;
-      }
-
-      @Override
-      public PCollection<KV<K, V>> expand(PBegin input) {
-        return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-            WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
-      }
-
-    }
-
-  }
-
-  /**
-   * Write operation on HDFS.
-   */
-  public static final class Write {
-
-    private Write() {
-    }
-
-    public static <K, V> Bound<K, V> to(String filenamePrefix,
-        Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> value) {
-      return new Bound<>(filenamePrefix, format, key, value);
-    }
-
-    /**
-     * A {@link PTransform} writing {@link PCollection} on HDFS.
-     */
-    public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
-
-      /** The filename to write to. */
-      private final String filenamePrefix;
-      /** Suffix to use for each filename. */
-      private final String filenameSuffix;
-      /** Requested number of shards.  0 for automatic. */
-      private final int numShards;
-      /** Shard template string. */
-      private final String shardTemplate;
-      private final Class<? extends FileOutputFormat<K, V>> formatClass;
-      private final Class<K> keyClass;
-      private final Class<V> valueClass;
-      private final Map<String, String> configurationProperties;
-
-      Bound(String filenamePrefix, Class<? extends FileOutputFormat<K, V>> format,
-          Class<K> key,
-          Class<V> value) {
-        this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, key, value,
-            new HashMap<String, String>());
-      }
-
-      Bound(String filenamePrefix, String filenameSuffix, int numShards,
-          String shardTemplate, Class<? extends FileOutputFormat<K, V>> format,
-          Class<K> key, Class<V> value, Map<String, String> configurationProperties) {
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.formatClass = format;
-        this.keyClass = key;
-        this.valueClass = value;
-        this.configurationProperties = configurationProperties;
-      }
-
-      public Bound<K, V> withoutSharding() {
-        return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass,
-            keyClass, valueClass, configurationProperties);
-      }
-
-      public Bound<K, V> withConfigurationProperty(String key, String value) {
-        configurationProperties.put(key, value);
-        return this;
-      }
-
-      public String getFilenamePrefix() {
-        return filenamePrefix;
-      }
-
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
-
-      public int getNumShards() {
-        return numShards;
-      }
-
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
-
-      public Class<? extends FileOutputFormat<K, V>> getFormatClass() {
-        return formatClass;
-      }
-
-      public Class<V> getValueClass() {
-        return valueClass;
-      }
-
-      public Class<K> getKeyClass() {
-        return keyClass;
-      }
-
-      public Map<String, String> getConfigurationProperties() {
-        return configurationProperties;
-      }
-
-      @Override
-      public PDone expand(PCollection<KV<K, V>> input) {
-        checkNotNull(
-            filenamePrefix, "need to set the filename prefix of an HadoopIO.Write transform");
-        checkNotNull(formatClass, "need to set the format class of an HadoopIO.Write transform");
-        checkNotNull(keyClass, "need to set the key class of an HadoopIO.Write transform");
-        checkNotNull(valueClass, "need to set the value class of an HadoopIO.Write transform");
-
-        checkArgument(
-            ShardNameTemplateAware.class.isAssignableFrom(formatClass),
-            "Format class must implement %s",
-            ShardNameTemplateAware.class.getName());
-
-        return PDone.in(input.getPipeline());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
deleted file mode 100644
index 11b4b53..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Shard name builder.
- */
-public final class ShardNameBuilder {
-
-  private ShardNameBuilder() {
-  }
-
-  /**
-   * Replace occurrences of uppercase letters 'N' with the given {code}shardCount{code},
-   * left-padded with zeros if necessary.
-   * @see org.apache.beam.sdk.io.ShardNameTemplate
-   * @param template the string template containing uppercase letters 'N'
-   * @param shardCount the total number of shards
-   * @return a string template with 'N' replaced by the shard count
-   */
-  public static String replaceShardCount(String template, int shardCount) {
-    return replaceShardPattern(template, "N+", shardCount);
-  }
-
-  /**
-   * Replace occurrences of uppercase letters 'S' with the given {code}shardNumber{code},
-   * left-padded with zeros if necessary.
-   * @see org.apache.beam.sdk.io.ShardNameTemplate
-   * @param template the string template containing uppercase letters 'S'
-   * @param shardNumber the number of a particular shard
-   * @return a string template with 'S' replaced by the shard number
-   */
-  public static String replaceShardNumber(String template, int shardNumber) {
-    return replaceShardPattern(template, "S+", shardNumber);
-  }
-
-  private static String replaceShardPattern(String template, String pattern, int n) {
-    Pattern p = Pattern.compile(pattern);
-    Matcher m = p.matcher(template);
-    StringBuffer sb = new StringBuffer();
-    while (m.find()) {
-      // replace pattern with a String format string:
-      // index 1, zero-padding flag (0), width length of matched pattern, decimal conversion
-      m.appendReplacement(sb, "%1\\$0" + m.group().length() + "d");
-    }
-    m.appendTail(sb);
-    return String.format(sb.toString(), n);
-  }
-
-  /**
-   * @param pathPrefix a relative or absolute path
-   * @param template a template string
-   * @return the output directory for the given prefix, template and suffix
-   */
-  public static String getOutputDirectory(String pathPrefix, String template) {
-    String out = new Path(pathPrefix + template).getParent().toString();
-    if (out.isEmpty()) {
-      return "./";
-    }
-    return out;
-  }
-
-  /**
-   * @param pathPrefix a relative or absolute path
-   * @param template a template string
-   * @return the prefix of the output filename for the given path prefix and template
-   */
-  public static String getOutputFilePrefix(String pathPrefix, String template) {
-    String name = new Path(pathPrefix + template).getName();
-    if (name.endsWith(template)) {
-      return name.substring(0, name.length() - template.length());
-    } else {
-      return "";
-    }
-  }
-
-  /**
-   * @param pathPrefix a relative or absolute path
-   * @param template a template string
-   * @return the template for the output filename for the given path prefix and
-   * template
-   */
-  public static String getOutputFileTemplate(String pathPrefix, String template) {
-    String name = new Path(pathPrefix + template).getName();
-    if (name.endsWith(template)) {
-      return template;
-    } else {
-      return name;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java
deleted file mode 100644
index d78b437..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-/**
- * A marker interface that implementations of
- * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat} implement to indicate
- * that they produce shard names that adhere to the template in
- * {@link HadoopIO.Write}.
- *
- * <p>Some common shard names are defined in
- * {@link org.apache.beam.sdk.io.ShardNameTemplate}.
- */
-public interface ShardNameTemplateAware {
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
deleted file mode 100644
index 4a7058b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Shard name template helper.
- */
-public final class ShardNameTemplateHelper {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);
-
-  public static final String OUTPUT_FILE_PREFIX = "spark.beam.fileoutputformat.prefix";
-  public static final String OUTPUT_FILE_TEMPLATE = "spark.beam.fileoutputformat.template";
-  public static final String OUTPUT_FILE_SUFFIX = "spark.beam.fileoutputformat.suffix";
-
-  private ShardNameTemplateHelper() {
-  }
-
-  public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format,
-      TaskAttemptContext context) throws IOException {
-    FileOutputCommitter committer =
-        (FileOutputCommitter) format.getOutputCommitter(context);
-    return new Path(committer.getWorkPath(), getOutputFile(context));
-  }
-
-  private static String getOutputFile(TaskAttemptContext context) {
-    TaskID taskId = context.getTaskAttemptID().getTaskID();
-    int partition = taskId.getId();
-
-    String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX);
-    String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE);
-    String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX);
-    return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
deleted file mode 100644
index 62a610b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Templated Avro key output format.
- */
-public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
-    implements ShardNameTemplateAware {
-
-  @Override
-  public void checkOutputSpecs(JobContext job) {
-    // don't fail if the output already exists
-  }
-
-  @Override
-  protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
-    Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context);
-    return path.getFileSystem(context.getConfiguration()).create(path);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
deleted file mode 100644
index ab1263b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-/**
- * Templated sequence file output format.
- */
-public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>
-    implements ShardNameTemplateAware {
-
-  @Override
-  public void checkOutputSpecs(JobContext job) {
-    // don't fail if the output already exists
-  }
-
-  @Override
-  public Path getDefaultWorkFile(TaskAttemptContext context,
-      String extension) throws IOException {
-    // note that the passed-in extension is ignored since it comes from the template
-    return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
deleted file mode 100644
index 5a6e9a9..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-/**
- * Templates text output format.
- */
-public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
-    implements ShardNameTemplateAware {
-
-  @Override
-  public void checkOutputSpecs(JobContext job) {
-    // don't fail if the output already exists
-  }
-
-  @Override
-  public Path getDefaultWorkFile(TaskAttemptContext context,
-      String extension) throws IOException {
-    // note that the passed-in extension is ignored since it comes from the template
-    return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java
deleted file mode 100644
index 70cd0f3..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark-specific transforms for reading from and writing to Hadoop file systems (HDFS).
- */
-package org.apache.beam.runners.spark.io.hadoop;

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index d88ef7e..6290bba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -20,10 +20,6 @@ package org.apache.beam.runners.spark.translation;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
 import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable;
 import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
 
@@ -31,22 +27,14 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.SourceRDD;
-import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
-import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
-import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
-import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
@@ -54,9 +42,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Create;
@@ -78,18 +64,11 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
 
 
 /**
@@ -427,119 +406,6 @@ public final class TransformTranslator {
     };
   }
 
-
-  private static <T> TransformEvaluator<TextIO.Read.Bound> readText() {
-    return new TransformEvaluator<TextIO.Read.Bound>() {
-      @Override
-      public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) {
-        String pattern = transform.getFilepattern();
-        JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
-            .map(WindowingHelpers.<String>windowFunction());
-        context.putDataset(transform, new BoundedDataset<>(rdd));
-      }
-
-      @Override
-      public String toNativeString() {
-        return "sparkContext.textFile(...)";
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<TextIO.Write.Bound> writeText() {
-    return new TransformEvaluator<TextIO.Write.Bound>() {
-      @Override
-      public void evaluate(TextIO.Write.Bound transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<T, Void> last =
-            ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
-            .map(WindowingHelpers.<T>unwindowFunction())
-            .mapToPair(new PairFunction<T, T,
-                    Void>() {
-              @Override
-              public Tuple2<T, Void> call(T t) throws Exception {
-                return new Tuple2<>(t, null);
-              }
-            });
-        ShardTemplateInformation shardTemplateInfo =
-            new ShardTemplateInformation(transform.getNumShards(),
-                transform.getShardTemplate(), transform.getFilenamePrefix(),
-                transform.getFilenameSuffix());
-        writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class,
-            NullWritable.class, TemplatedTextOutputFormat.class);
-      }
-
-      @Override
-      public String toNativeString() {
-        return "saveAsNewAPIHadoopFile(...)";
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
-    return new TransformEvaluator<AvroIO.Read.Bound<T>>() {
-      @Override
-      public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
-        String pattern = transform.getFilepattern();
-        JavaSparkContext jsc = context.getSparkContext();
-        @SuppressWarnings("unchecked")
-        JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>)
-            jsc.newAPIHadoopFile(pattern,
-                                 AvroKeyInputFormat.class,
-                                 AvroKey.class, NullWritable.class,
-                                 new Configuration()).keys();
-        JavaRDD<WindowedValue<T>> rdd = avroFile.map(
-            new Function<AvroKey<T>, T>() {
-              @Override
-              public T call(AvroKey<T> key) {
-                return key.datum();
-              }
-            }).map(WindowingHelpers.<T>windowFunction());
-        context.putDataset(transform, new BoundedDataset<>(rdd));
-      }
-
-      @Override
-      public String toNativeString() {
-        return "sparkContext.newAPIHadoopFile(...)";
-      }
-    };
-  }
-
-  private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() {
-    return new TransformEvaluator<AvroIO.Write.Bound<T>>() {
-      @Override
-      public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) {
-        Job job;
-        try {
-          job = Job.getInstance();
-        } catch (IOException e) {
-          throw new IllegalStateException(e);
-        }
-        AvroJob.setOutputKeySchema(job, transform.getSchema());
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<AvroKey<T>, NullWritable> last =
-            ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
-            .map(WindowingHelpers.<T>unwindowFunction())
-            .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
-              @Override
-              public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
-                return new Tuple2<>(new AvroKey<>(t), NullWritable.get());
-              }
-            });
-        ShardTemplateInformation shardTemplateInfo =
-            new ShardTemplateInformation(transform.getNumShards(),
-            transform.getShardTemplate(), transform.getFilenamePrefix(),
-            transform.getFilenameSuffix());
-        writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo,
-            AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class);
-      }
-
-      @Override
-      public String toNativeString() {
-        return "mapToPair(<objectToAvroKeyFn>).saveAsNewAPIHadoopFile(...)";
-      }
-    };
-  }
-
   private static <T> TransformEvaluator<Read.Bounded<T>> readBounded() {
     return new TransformEvaluator<Read.Bounded<T>>() {
       @Override
@@ -561,121 +427,6 @@ public final class TransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
-    return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
-      @Override
-      public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
-        String pattern = transform.getFilepattern();
-        JavaSparkContext jsc = context.getSparkContext();
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
-            transform.getFormatClass(),
-            transform.getKeyClass(), transform.getValueClass(),
-            new Configuration());
-        JavaRDD<WindowedValue<KV<K, V>>> rdd =
-            file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
-            return KV.of(t2._1(), t2._2());
-          }
-        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
-        context.putDataset(transform, new BoundedDataset<>(rdd));
-      }
-
-      @Override
-      public String toNativeString() {
-        return "sparkContext.newAPIHadoopFile(...)";
-      }
-    };
-  }
-
-  private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() {
-    return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() {
-      @Override
-      public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
-        @SuppressWarnings("unchecked")
-        JavaPairRDD<K, V> last = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform))
-            .getRDD()
-            .map(WindowingHelpers.<KV<K, V>>unwindowFunction())
-            .mapToPair(new PairFunction<KV<K, V>, K, V>() {
-              @Override
-              public Tuple2<K, V> call(KV<K, V> t) throws Exception {
-                return new Tuple2<>(t.getKey(), t.getValue());
-              }
-            });
-        ShardTemplateInformation shardTemplateInfo =
-            new ShardTemplateInformation(transform.getNumShards(),
-                transform.getShardTemplate(), transform.getFilenamePrefix(),
-                transform.getFilenameSuffix());
-        Configuration conf = new Configuration();
-        for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) {
-          conf.set(e.getKey(), e.getValue());
-        }
-        writeHadoopFile(last, conf, shardTemplateInfo,
-            transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass());
-      }
-
-      @Override
-      public String toNativeString() {
-        return "saveAsNewAPIHadoopFile(...)";
-      }
-    };
-  }
-
-  private static final class ShardTemplateInformation {
-    private final int numShards;
-    private final String shardTemplate;
-    private final String filenamePrefix;
-    private final String filenameSuffix;
-
-    private ShardTemplateInformation(int numShards, String shardTemplate, String
-        filenamePrefix, String filenameSuffix) {
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-    }
-
-    int getNumShards() {
-      return numShards;
-    }
-
-    String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    String getFilenamePrefix() {
-      return filenamePrefix;
-    }
-
-    String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-  }
-
-  private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf,
-      ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass,
-      Class<? extends FileOutputFormat> formatClass) {
-    int numShards = shardTemplateInfo.getNumShards();
-    String shardTemplate = shardTemplateInfo.getShardTemplate();
-    String filenamePrefix = shardTemplateInfo.getFilenamePrefix();
-    String filenameSuffix = shardTemplateInfo.getFilenameSuffix();
-    if (numShards != 0) {
-      // number of shards was set explicitly, so repartition
-      rdd = rdd.repartition(numShards);
-    }
-    int actualNumShards = rdd.partitions().size();
-    String template = replaceShardCount(shardTemplate, actualNumShards);
-    String outputDir = getOutputDirectory(filenamePrefix, template);
-    String filePrefix = getOutputFilePrefix(filenamePrefix, template);
-    String fileTemplate = getOutputFileTemplate(filenamePrefix, template);
-
-    conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix);
-    conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate);
-    conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix);
-    rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
-  }
-
   private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
     return new TransformEvaluator<Window.Assign<T>>() {
       @Override
@@ -847,8 +598,6 @@ public final class TransformTranslator {
 
   static {
     EVALUATORS.put(Read.Bounded.class, readBounded());
-    EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop());
-    EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
     EVALUATORS.put(ParDo.MultiOutput.class, parDo());
     EVALUATORS.put(GroupByKey.class, groupByKey());
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
deleted file mode 100644
index 48b5433..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Pipeline on the Hadoop file format test.
- */
-public class HadoopFileFormatPipelineTest {
-
-  private File inputFile;
-  private File outputFile;
-
-  @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
-
-  @Rule
-  public final TemporaryFolder tmpDir = new TemporaryFolder();
-
-  @Before
-  public void setUp() throws IOException {
-    inputFile = tmpDir.newFile("test.seq");
-    outputFile = tmpDir.newFolder("out");
-    outputFile.delete();
-  }
-
-  @Test
-  public void testSequenceFile() throws Exception {
-    populateFile();
-
-    Pipeline p = pipelineRule.createPipeline();
-    @SuppressWarnings("unchecked")
-    Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
-        (Class<? extends FileInputFormat<IntWritable, Text>>)
-            (Class<?>) SequenceFileInputFormat.class;
-    HadoopIO.Read.Bound<IntWritable, Text> read =
-        HadoopIO.Read.from(inputFile.getAbsolutePath(),
-            inputFormatClass,
-            IntWritable.class,
-            Text.class);
-    PCollection<KV<IntWritable, Text>> input = p.apply(read)
-        .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class)));
-    @SuppressWarnings("unchecked")
-    Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
-        (Class<? extends FileOutputFormat<IntWritable, Text>>)
-            (Class<?>) TemplatedSequenceFileOutputFormat.class;
-    @SuppressWarnings("unchecked")
-    HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
-        outputFormatClass, IntWritable.class, Text.class);
-    input.apply(write.withoutSharding());
-    p.run().waitUntilFinish();
-
-    IntWritable key = new IntWritable();
-    Text value = new Text();
-    try (Reader reader = new Reader(new Configuration(),
-        Reader.file(new Path(outputFile.toURI())))) {
-      int i = 0;
-      while (reader.next(key, value)) {
-        assertEquals(i, key.get());
-        assertEquals("value-" + i, value.toString());
-        i++;
-      }
-    }
-  }
-
-  private void populateFile() throws IOException {
-    IntWritable key = new IntWritable();
-    Text value = new Text();
-    try (Writer writer = SequenceFile.createWriter(
-        new Configuration(),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-        Writer.file(new Path(this.inputFile.toURI())))) {
-      for (int i = 0; i < 5; i++) {
-        key.set(i);
-        value.set("value-" + i);
-        writer.append(key, value);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
deleted file mode 100644
index 1f2cf63..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-/**
- * Test on the {@link ShardNameBuilder}.
- */
-public class ShardNameBuilderTest {
-
-  @Test
-  public void testReplaceShardCount() {
-    assertEquals("", replaceShardCount("", 6));
-    assertEquals("-S-of-6", replaceShardCount("-S-of-N", 6));
-    assertEquals("-SS-of-06", replaceShardCount("-SS-of-NN", 6));
-    assertEquals("-S-of-60", replaceShardCount("-S-of-N", 60));
-    assertEquals("-SS-of-60", replaceShardCount("-SS-of-NN", 60));
-    assertEquals("/part-SSSSS", replaceShardCount("/part-SSSSS", 6));
-  }
-
-  @Test
-  public void testReplaceShardNumber() {
-    assertEquals("", replaceShardNumber("", 5));
-    assertEquals("-5-of-6", replaceShardNumber("-S-of-6", 5));
-    assertEquals("-05-of-06", replaceShardNumber("-SS-of-06", 5));
-    assertEquals("-59-of-60", replaceShardNumber("-S-of-60", 59));
-    assertEquals("-59-of-60", replaceShardNumber("-SS-of-60", 59));
-    assertEquals("/part-00005", replaceShardNumber("/part-SSSSS", 5));
-  }
-
-  @Test
-     public void testGetOutputDirectory() {
-    assertEquals("./", getOutputDirectory("foo", "-S-of-N"));
-    assertEquals("foo", getOutputDirectory("foo/bar", "-S-of-N"));
-    assertEquals("/foo", getOutputDirectory("/foo/bar", "-S-of-N"));
-    assertEquals("hdfs://foo/", getOutputDirectory("hdfs://foo/bar", "-S-of-N"));
-    assertEquals("foo/bar", getOutputDirectory("foo/bar", "/part-SSSSS"));
-    assertEquals("/foo/bar", getOutputDirectory("/foo/bar", "/part-SSSSS"));
-    assertEquals("hdfs://foo/bar", getOutputDirectory("hdfs://foo/bar", "/part-SSSSS"));
-  }
-
-  @Test
-  public void testGetOutputFilePrefix() {
-    assertEquals("foo", getOutputFilePrefix("foo", "-S-of-N"));
-    assertEquals("bar", getOutputFilePrefix("foo/bar", "-S-of-N"));
-    assertEquals("bar", getOutputFilePrefix("/foo/bar", "-S-of-N"));
-    assertEquals("bar", getOutputFilePrefix("hdfs://foo/bar", "-S-of-N"));
-    assertEquals("", getOutputFilePrefix("foo/bar", "/part-SSSSS"));
-    assertEquals("", getOutputFilePrefix("/foo/bar", "/part-SSSSS"));
-    assertEquals("", getOutputFilePrefix("hdfs://foo/bar", "/part-SSSSS"));
-  }
-
-  @Test
-  public void testGetOutputFileTemplate() {
-    assertEquals("-S-of-N", getOutputFileTemplate("foo", "-S-of-N"));
-    assertEquals("-S-of-N", getOutputFileTemplate("foo/bar", "-S-of-N"));
-    assertEquals("-S-of-N", getOutputFileTemplate("/foo/bar", "-S-of-N"));
-    assertEquals("-S-of-N", getOutputFileTemplate("hdfs://foo/bar", "-S-of-N"));
-    assertEquals("part-SSSSS", getOutputFileTemplate("foo/bar", "/part-SSSSS"));
-    assertEquals("part-SSSSS", getOutputFileTemplate("/foo/bar", "/part-SSSSS"));
-    assertEquals("part-SSSSS", getOutputFileTemplate("hdfs://foo/bar", "/part-SSSSS"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/resources/test_text.txt
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/resources/test_text.txt b/runners/spark/src/test/resources/test_text.txt
deleted file mode 100644
index 6a14a1b..0000000
--- a/runners/spark/src/test/resources/test_text.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-test line 1
-test line 2
\ No newline at end of file


[2/2] beam git commit: This closes #2419

Posted by ie...@apache.org.
This closes #2419


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/45f63eb6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/45f63eb6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/45f63eb6

Branch: refs/heads/master
Commit: 45f63eb6c4d763f2ff8a3ff3e41df32aa00d1d2b
Parents: cc8e0b9 6f7defd
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 4 14:03:55 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 14:03:55 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  19 --
 .../beam/runners/spark/io/hadoop/HadoopIO.java  | 216 ----------------
 .../spark/io/hadoop/ShardNameBuilder.java       | 111 --------
 .../spark/io/hadoop/ShardNameTemplateAware.java |  31 ---
 .../io/hadoop/ShardNameTemplateHelper.java      |  63 -----
 .../io/hadoop/TemplatedAvroKeyOutputFormat.java |  45 ----
 .../TemplatedSequenceFileOutputFormat.java      |  45 ----
 .../io/hadoop/TemplatedTextOutputFormat.java    |  45 ----
 .../runners/spark/io/hadoop/package-info.java   |  22 --
 .../spark/translation/TransformTranslator.java  | 251 -------------------
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 121 ---------
 .../spark/io/hadoop/ShardNameBuilderTest.java   |  88 -------
 runners/spark/src/test/resources/test_text.txt  |   2 -
 13 files changed, 1059 deletions(-)
----------------------------------------------------------------------