You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/04 12:04:37 UTC
[1/2] beam git commit: [BEAM-1875] Remove Spark-runner-custom Hadoop
and Avro IOs
Repository: beam
Updated Branches:
refs/heads/master cc8e0b9df -> 45f63eb6c
[BEAM-1875] Remove Spark-runner-custom Hadoop and Avro IOs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f7defdb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f7defdb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f7defdb
Branch: refs/heads/master
Commit: 6f7defdbb1aa14bf5577bf24ba1d3307cd2fc8b9
Parents: cc8e0b9
Author: Amit Sela <am...@gmail.com>
Authored: Tue Apr 4 10:31:31 2017 +0300
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 13:58:40 2017 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 19 --
.../beam/runners/spark/io/hadoop/HadoopIO.java | 216 ----------------
.../spark/io/hadoop/ShardNameBuilder.java | 111 --------
.../spark/io/hadoop/ShardNameTemplateAware.java | 31 ---
.../io/hadoop/ShardNameTemplateHelper.java | 63 -----
.../io/hadoop/TemplatedAvroKeyOutputFormat.java | 45 ----
.../TemplatedSequenceFileOutputFormat.java | 45 ----
.../io/hadoop/TemplatedTextOutputFormat.java | 45 ----
.../runners/spark/io/hadoop/package-info.java | 22 --
.../spark/translation/TransformTranslator.java | 251 -------------------
.../io/hadoop/HadoopFileFormatPipelineTest.java | 121 ---------
.../spark/io/hadoop/ShardNameBuilderTest.java | 88 -------
runners/spark/src/test/resources/test_text.txt | 2 -
13 files changed, 1059 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 96480cd..dd174bf 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -248,19 +248,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>${avro.version}</version>
- <classifier>hadoop2</classifier>
- <exclusions>
- <!-- exclude old Jetty version of servlet API -->
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${dropwizard.metrics.version}</version>
@@ -318,12 +305,6 @@
</dependency>
<dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
deleted file mode 100644
index f2457ce..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.io.ShardNameTemplate;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
- * Spark native HadoopIO.
- */
-public final class HadoopIO {
-
- private HadoopIO() {
- }
-
- /**
- * Read operation from HDFS.
- */
- public static final class Read {
-
- private Read() {
- }
-
- public static <K, V> Bound<K, V> from(String filepattern,
- Class<? extends FileInputFormat<K, V>> format, Class<K> key, Class<V> value) {
- return new Bound<>(filepattern, format, key, value);
- }
-
- /**
- * A {@link PTransform} reading bounded collection of data from HDFS.
- * @param <K> the type of the keys
- * @param <V> the type of the values
- */
- public static class Bound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
-
- private final String filepattern;
- private final Class<? extends FileInputFormat<K, V>> formatClass;
- private final Class<K> keyClass;
- private final Class<V> valueClass;
-
- Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key,
- Class<V> value) {
- checkNotNull(filepattern, "need to set the filepattern of an HadoopIO.Read transform");
- checkNotNull(format, "need to set the format class of an HadoopIO.Read transform");
- checkNotNull(key, "need to set the key class of an HadoopIO.Read transform");
- checkNotNull(value, "need to set the value class of an HadoopIO.Read transform");
- this.filepattern = filepattern;
- this.formatClass = format;
- this.keyClass = key;
- this.valueClass = value;
- }
-
- public String getFilepattern() {
- return filepattern;
- }
-
- public Class<? extends FileInputFormat<K, V>> getFormatClass() {
- return formatClass;
- }
-
- public Class<V> getValueClass() {
- return valueClass;
- }
-
- public Class<K> getKeyClass() {
- return keyClass;
- }
-
- @Override
- public PCollection<KV<K, V>> expand(PBegin input) {
- return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
- WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
- }
-
- }
-
- }
-
- /**
- * Write operation on HDFS.
- */
- public static final class Write {
-
- private Write() {
- }
-
- public static <K, V> Bound<K, V> to(String filenamePrefix,
- Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> value) {
- return new Bound<>(filenamePrefix, format, key, value);
- }
-
- /**
- * A {@link PTransform} writing {@link PCollection} on HDFS.
- */
- public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
-
- /** The filename to write to. */
- private final String filenamePrefix;
- /** Suffix to use for each filename. */
- private final String filenameSuffix;
- /** Requested number of shards. 0 for automatic. */
- private final int numShards;
- /** Shard template string. */
- private final String shardTemplate;
- private final Class<? extends FileOutputFormat<K, V>> formatClass;
- private final Class<K> keyClass;
- private final Class<V> valueClass;
- private final Map<String, String> configurationProperties;
-
- Bound(String filenamePrefix, Class<? extends FileOutputFormat<K, V>> format,
- Class<K> key,
- Class<V> value) {
- this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, key, value,
- new HashMap<String, String>());
- }
-
- Bound(String filenamePrefix, String filenameSuffix, int numShards,
- String shardTemplate, Class<? extends FileOutputFormat<K, V>> format,
- Class<K> key, Class<V> value, Map<String, String> configurationProperties) {
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.formatClass = format;
- this.keyClass = key;
- this.valueClass = value;
- this.configurationProperties = configurationProperties;
- }
-
- public Bound<K, V> withoutSharding() {
- return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass,
- keyClass, valueClass, configurationProperties);
- }
-
- public Bound<K, V> withConfigurationProperty(String key, String value) {
- configurationProperties.put(key, value);
- return this;
- }
-
- public String getFilenamePrefix() {
- return filenamePrefix;
- }
-
- public String getShardTemplate() {
- return shardTemplate;
- }
-
- public int getNumShards() {
- return numShards;
- }
-
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
-
- public Class<? extends FileOutputFormat<K, V>> getFormatClass() {
- return formatClass;
- }
-
- public Class<V> getValueClass() {
- return valueClass;
- }
-
- public Class<K> getKeyClass() {
- return keyClass;
- }
-
- public Map<String, String> getConfigurationProperties() {
- return configurationProperties;
- }
-
- @Override
- public PDone expand(PCollection<KV<K, V>> input) {
- checkNotNull(
- filenamePrefix, "need to set the filename prefix of an HadoopIO.Write transform");
- checkNotNull(formatClass, "need to set the format class of an HadoopIO.Write transform");
- checkNotNull(keyClass, "need to set the key class of an HadoopIO.Write transform");
- checkNotNull(valueClass, "need to set the value class of an HadoopIO.Write transform");
-
- checkArgument(
- ShardNameTemplateAware.class.isAssignableFrom(formatClass),
- "Format class must implement %s",
- ShardNameTemplateAware.class.getName());
-
- return PDone.in(input.getPipeline());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
deleted file mode 100644
index 11b4b53..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Shard name builder.
- */
-public final class ShardNameBuilder {
-
- private ShardNameBuilder() {
- }
-
- /**
- * Replace occurrences of uppercase letters 'N' with the given {code}shardCount{code},
- * left-padded with zeros if necessary.
- * @see org.apache.beam.sdk.io.ShardNameTemplate
- * @param template the string template containing uppercase letters 'N'
- * @param shardCount the total number of shards
- * @return a string template with 'N' replaced by the shard count
- */
- public static String replaceShardCount(String template, int shardCount) {
- return replaceShardPattern(template, "N+", shardCount);
- }
-
- /**
- * Replace occurrences of uppercase letters 'S' with the given {code}shardNumber{code},
- * left-padded with zeros if necessary.
- * @see org.apache.beam.sdk.io.ShardNameTemplate
- * @param template the string template containing uppercase letters 'S'
- * @param shardNumber the number of a particular shard
- * @return a string template with 'S' replaced by the shard number
- */
- public static String replaceShardNumber(String template, int shardNumber) {
- return replaceShardPattern(template, "S+", shardNumber);
- }
-
- private static String replaceShardPattern(String template, String pattern, int n) {
- Pattern p = Pattern.compile(pattern);
- Matcher m = p.matcher(template);
- StringBuffer sb = new StringBuffer();
- while (m.find()) {
- // replace pattern with a String format string:
- // index 1, zero-padding flag (0), width length of matched pattern, decimal conversion
- m.appendReplacement(sb, "%1\\$0" + m.group().length() + "d");
- }
- m.appendTail(sb);
- return String.format(sb.toString(), n);
- }
-
- /**
- * @param pathPrefix a relative or absolute path
- * @param template a template string
- * @return the output directory for the given prefix, template and suffix
- */
- public static String getOutputDirectory(String pathPrefix, String template) {
- String out = new Path(pathPrefix + template).getParent().toString();
- if (out.isEmpty()) {
- return "./";
- }
- return out;
- }
-
- /**
- * @param pathPrefix a relative or absolute path
- * @param template a template string
- * @return the prefix of the output filename for the given path prefix and template
- */
- public static String getOutputFilePrefix(String pathPrefix, String template) {
- String name = new Path(pathPrefix + template).getName();
- if (name.endsWith(template)) {
- return name.substring(0, name.length() - template.length());
- } else {
- return "";
- }
- }
-
- /**
- * @param pathPrefix a relative or absolute path
- * @param template a template string
- * @return the template for the output filename for the given path prefix and
- * template
- */
- public static String getOutputFileTemplate(String pathPrefix, String template) {
- String name = new Path(pathPrefix + template).getName();
- if (name.endsWith(template)) {
- return template;
- } else {
- return name;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java
deleted file mode 100644
index d78b437..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateAware.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-/**
- * A marker interface that implementations of
- * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat} implement to indicate
- * that they produce shard names that adhere to the template in
- * {@link HadoopIO.Write}.
- *
- * <p>Some common shard names are defined in
- * {@link org.apache.beam.sdk.io.ShardNameTemplate}.
- */
-public interface ShardNameTemplateAware {
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
deleted file mode 100644
index 4a7058b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Shard name template helper.
- */
-public final class ShardNameTemplateHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);
-
- public static final String OUTPUT_FILE_PREFIX = "spark.beam.fileoutputformat.prefix";
- public static final String OUTPUT_FILE_TEMPLATE = "spark.beam.fileoutputformat.template";
- public static final String OUTPUT_FILE_SUFFIX = "spark.beam.fileoutputformat.suffix";
-
- private ShardNameTemplateHelper() {
- }
-
- public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format,
- TaskAttemptContext context) throws IOException {
- FileOutputCommitter committer =
- (FileOutputCommitter) format.getOutputCommitter(context);
- return new Path(committer.getWorkPath(), getOutputFile(context));
- }
-
- private static String getOutputFile(TaskAttemptContext context) {
- TaskID taskId = context.getTaskAttemptID().getTaskID();
- int partition = taskId.getId();
-
- String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX);
- String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE);
- String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX);
- return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
deleted file mode 100644
index 62a610b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Templated Avro key output format.
- */
-public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
- implements ShardNameTemplateAware {
-
- @Override
- public void checkOutputSpecs(JobContext job) {
- // don't fail if the output already exists
- }
-
- @Override
- protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
- Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context);
- return path.getFileSystem(context.getConfiguration()).create(path);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
deleted file mode 100644
index ab1263b..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-/**
- * Templated sequence file output format.
- */
-public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>
- implements ShardNameTemplateAware {
-
- @Override
- public void checkOutputSpecs(JobContext job) {
- // don't fail if the output already exists
- }
-
- @Override
- public Path getDefaultWorkFile(TaskAttemptContext context,
- String extension) throws IOException {
- // note that the passed-in extension is ignored since it comes from the template
- return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
deleted file mode 100644
index 5a6e9a9..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import java.io.IOException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-/**
- * Templates text output format.
- */
-public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
- implements ShardNameTemplateAware {
-
- @Override
- public void checkOutputSpecs(JobContext job) {
- // don't fail if the output already exists
- }
-
- @Override
- public Path getDefaultWorkFile(TaskAttemptContext context,
- String extension) throws IOException {
- // note that the passed-in extension is ignored since it comes from the template
- return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java
deleted file mode 100644
index 70cd0f3..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark-specific transforms for reading from and writing to Hadoop file systems (HDFS).
- */
-package org.apache.beam.runners.spark.io.hadoop;
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index d88ef7e..6290bba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -20,10 +20,6 @@ package org.apache.beam.runners.spark.translation;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable;
import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers;
@@ -31,22 +27,14 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.SourceRDD;
-import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
-import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
-import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
-import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
@@ -54,9 +42,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Create;
@@ -78,18 +64,11 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
/**
@@ -427,119 +406,6 @@ public final class TransformTranslator {
};
}
-
- private static <T> TransformEvaluator<TextIO.Read.Bound> readText() {
- return new TransformEvaluator<TextIO.Read.Bound>() {
- @Override
- public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) {
- String pattern = transform.getFilepattern();
- JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
- .map(WindowingHelpers.<String>windowFunction());
- context.putDataset(transform, new BoundedDataset<>(rdd));
- }
-
- @Override
- public String toNativeString() {
- return "sparkContext.textFile(...)";
- }
- };
- }
-
- private static <T> TransformEvaluator<TextIO.Write.Bound> writeText() {
- return new TransformEvaluator<TextIO.Write.Bound>() {
- @Override
- public void evaluate(TextIO.Write.Bound transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaPairRDD<T, Void> last =
- ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
- .map(WindowingHelpers.<T>unwindowFunction())
- .mapToPair(new PairFunction<T, T,
- Void>() {
- @Override
- public Tuple2<T, Void> call(T t) throws Exception {
- return new Tuple2<>(t, null);
- }
- });
- ShardTemplateInformation shardTemplateInfo =
- new ShardTemplateInformation(transform.getNumShards(),
- transform.getShardTemplate(), transform.getFilenamePrefix(),
- transform.getFilenameSuffix());
- writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class,
- NullWritable.class, TemplatedTextOutputFormat.class);
- }
-
- @Override
- public String toNativeString() {
- return "saveAsNewAPIHadoopFile(...)";
- }
- };
- }
-
- private static <T> TransformEvaluator<AvroIO.Read.Bound<T>> readAvro() {
- return new TransformEvaluator<AvroIO.Read.Bound<T>>() {
- @Override
- public void evaluate(AvroIO.Read.Bound<T> transform, EvaluationContext context) {
- String pattern = transform.getFilepattern();
- JavaSparkContext jsc = context.getSparkContext();
- @SuppressWarnings("unchecked")
- JavaRDD<AvroKey<T>> avroFile = (JavaRDD<AvroKey<T>>) (JavaRDD<?>)
- jsc.newAPIHadoopFile(pattern,
- AvroKeyInputFormat.class,
- AvroKey.class, NullWritable.class,
- new Configuration()).keys();
- JavaRDD<WindowedValue<T>> rdd = avroFile.map(
- new Function<AvroKey<T>, T>() {
- @Override
- public T call(AvroKey<T> key) {
- return key.datum();
- }
- }).map(WindowingHelpers.<T>windowFunction());
- context.putDataset(transform, new BoundedDataset<>(rdd));
- }
-
- @Override
- public String toNativeString() {
- return "sparkContext.newAPIHadoopFile(...)";
- }
- };
- }
-
- private static <T> TransformEvaluator<AvroIO.Write.Bound<T>> writeAvro() {
- return new TransformEvaluator<AvroIO.Write.Bound<T>>() {
- @Override
- public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) {
- Job job;
- try {
- job = Job.getInstance();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- AvroJob.setOutputKeySchema(job, transform.getSchema());
- @SuppressWarnings("unchecked")
- JavaPairRDD<AvroKey<T>, NullWritable> last =
- ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
- .map(WindowingHelpers.<T>unwindowFunction())
- .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
- @Override
- public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception {
- return new Tuple2<>(new AvroKey<>(t), NullWritable.get());
- }
- });
- ShardTemplateInformation shardTemplateInfo =
- new ShardTemplateInformation(transform.getNumShards(),
- transform.getShardTemplate(), transform.getFilenamePrefix(),
- transform.getFilenameSuffix());
- writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo,
- AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class);
- }
-
- @Override
- public String toNativeString() {
- return "mapToPair(<objectToAvroKeyFn>).saveAsNewAPIHadoopFile(...)";
- }
- };
- }
-
private static <T> TransformEvaluator<Read.Bounded<T>> readBounded() {
return new TransformEvaluator<Read.Bounded<T>>() {
@Override
@@ -561,121 +427,6 @@ public final class TransformTranslator {
};
}
- private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
- return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
- @Override
- public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
- String pattern = transform.getFilepattern();
- JavaSparkContext jsc = context.getSparkContext();
- @SuppressWarnings("unchecked")
- JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
- transform.getFormatClass(),
- transform.getKeyClass(), transform.getValueClass(),
- new Configuration());
- JavaRDD<WindowedValue<KV<K, V>>> rdd =
- file.map(new Function<Tuple2<K, V>, KV<K, V>>() {
- @Override
- public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
- return KV.of(t2._1(), t2._2());
- }
- }).map(WindowingHelpers.<KV<K, V>>windowFunction());
- context.putDataset(transform, new BoundedDataset<>(rdd));
- }
-
- @Override
- public String toNativeString() {
- return "sparkContext.newAPIHadoopFile(...)";
- }
- };
- }
-
- private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() {
- return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() {
- @Override
- public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
- @SuppressWarnings("unchecked")
- JavaPairRDD<K, V> last = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform))
- .getRDD()
- .map(WindowingHelpers.<KV<K, V>>unwindowFunction())
- .mapToPair(new PairFunction<KV<K, V>, K, V>() {
- @Override
- public Tuple2<K, V> call(KV<K, V> t) throws Exception {
- return new Tuple2<>(t.getKey(), t.getValue());
- }
- });
- ShardTemplateInformation shardTemplateInfo =
- new ShardTemplateInformation(transform.getNumShards(),
- transform.getShardTemplate(), transform.getFilenamePrefix(),
- transform.getFilenameSuffix());
- Configuration conf = new Configuration();
- for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) {
- conf.set(e.getKey(), e.getValue());
- }
- writeHadoopFile(last, conf, shardTemplateInfo,
- transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass());
- }
-
- @Override
- public String toNativeString() {
- return "saveAsNewAPIHadoopFile(...)";
- }
- };
- }
-
- private static final class ShardTemplateInformation {
- private final int numShards;
- private final String shardTemplate;
- private final String filenamePrefix;
- private final String filenameSuffix;
-
- private ShardTemplateInformation(int numShards, String shardTemplate, String
- filenamePrefix, String filenameSuffix) {
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- }
-
- int getNumShards() {
- return numShards;
- }
-
- String getShardTemplate() {
- return shardTemplate;
- }
-
- String getFilenamePrefix() {
- return filenamePrefix;
- }
-
- String getFilenameSuffix() {
- return filenameSuffix;
- }
- }
-
- private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf,
- ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass,
- Class<? extends FileOutputFormat> formatClass) {
- int numShards = shardTemplateInfo.getNumShards();
- String shardTemplate = shardTemplateInfo.getShardTemplate();
- String filenamePrefix = shardTemplateInfo.getFilenamePrefix();
- String filenameSuffix = shardTemplateInfo.getFilenameSuffix();
- if (numShards != 0) {
- // number of shards was set explicitly, so repartition
- rdd = rdd.repartition(numShards);
- }
- int actualNumShards = rdd.partitions().size();
- String template = replaceShardCount(shardTemplate, actualNumShards);
- String outputDir = getOutputDirectory(filenamePrefix, template);
- String filePrefix = getOutputFilePrefix(filenamePrefix, template);
- String fileTemplate = getOutputFileTemplate(filenamePrefix, template);
-
- conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix);
- conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate);
- conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix);
- rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
- }
-
private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
return new TransformEvaluator<Window.Assign<T>>() {
@Override
@@ -847,8 +598,6 @@ public final class TransformTranslator {
static {
EVALUATORS.put(Read.Bounded.class, readBounded());
- EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop());
- EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
EVALUATORS.put(ParDo.MultiOutput.class, parDo());
EVALUATORS.put(GroupByKey.class, groupByKey());
EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
deleted file mode 100644
index 48b5433..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Pipeline on the Hadoop file format test.
- */
-public class HadoopFileFormatPipelineTest {
-
- private File inputFile;
- private File outputFile;
-
- @Rule
- public final PipelineRule pipelineRule = PipelineRule.batch();
-
- @Rule
- public final TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Before
- public void setUp() throws IOException {
- inputFile = tmpDir.newFile("test.seq");
- outputFile = tmpDir.newFolder("out");
- outputFile.delete();
- }
-
- @Test
- public void testSequenceFile() throws Exception {
- populateFile();
-
- Pipeline p = pipelineRule.createPipeline();
- @SuppressWarnings("unchecked")
- Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
- (Class<? extends FileInputFormat<IntWritable, Text>>)
- (Class<?>) SequenceFileInputFormat.class;
- HadoopIO.Read.Bound<IntWritable, Text> read =
- HadoopIO.Read.from(inputFile.getAbsolutePath(),
- inputFormatClass,
- IntWritable.class,
- Text.class);
- PCollection<KV<IntWritable, Text>> input = p.apply(read)
- .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class)));
- @SuppressWarnings("unchecked")
- Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
- (Class<? extends FileOutputFormat<IntWritable, Text>>)
- (Class<?>) TemplatedSequenceFileOutputFormat.class;
- @SuppressWarnings("unchecked")
- HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
- outputFormatClass, IntWritable.class, Text.class);
- input.apply(write.withoutSharding());
- p.run().waitUntilFinish();
-
- IntWritable key = new IntWritable();
- Text value = new Text();
- try (Reader reader = new Reader(new Configuration(),
- Reader.file(new Path(outputFile.toURI())))) {
- int i = 0;
- while (reader.next(key, value)) {
- assertEquals(i, key.get());
- assertEquals("value-" + i, value.toString());
- i++;
- }
- }
- }
-
- private void populateFile() throws IOException {
- IntWritable key = new IntWritable();
- Text value = new Text();
- try (Writer writer = SequenceFile.createWriter(
- new Configuration(),
- Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
- Writer.file(new Path(this.inputFile.toURI())))) {
- for (int i = 0; i < 5; i++) {
- key.set(i);
- value.set("value-" + i);
- writer.append(key, value);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
deleted file mode 100644
index 1f2cf63..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.io.hadoop;
-
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-/**
- * Test on the {@link ShardNameBuilder}.
- */
-public class ShardNameBuilderTest {
-
- @Test
- public void testReplaceShardCount() {
- assertEquals("", replaceShardCount("", 6));
- assertEquals("-S-of-6", replaceShardCount("-S-of-N", 6));
- assertEquals("-SS-of-06", replaceShardCount("-SS-of-NN", 6));
- assertEquals("-S-of-60", replaceShardCount("-S-of-N", 60));
- assertEquals("-SS-of-60", replaceShardCount("-SS-of-NN", 60));
- assertEquals("/part-SSSSS", replaceShardCount("/part-SSSSS", 6));
- }
-
- @Test
- public void testReplaceShardNumber() {
- assertEquals("", replaceShardNumber("", 5));
- assertEquals("-5-of-6", replaceShardNumber("-S-of-6", 5));
- assertEquals("-05-of-06", replaceShardNumber("-SS-of-06", 5));
- assertEquals("-59-of-60", replaceShardNumber("-S-of-60", 59));
- assertEquals("-59-of-60", replaceShardNumber("-SS-of-60", 59));
- assertEquals("/part-00005", replaceShardNumber("/part-SSSSS", 5));
- }
-
- @Test
- public void testGetOutputDirectory() {
- assertEquals("./", getOutputDirectory("foo", "-S-of-N"));
- assertEquals("foo", getOutputDirectory("foo/bar", "-S-of-N"));
- assertEquals("/foo", getOutputDirectory("/foo/bar", "-S-of-N"));
- assertEquals("hdfs://foo/", getOutputDirectory("hdfs://foo/bar", "-S-of-N"));
- assertEquals("foo/bar", getOutputDirectory("foo/bar", "/part-SSSSS"));
- assertEquals("/foo/bar", getOutputDirectory("/foo/bar", "/part-SSSSS"));
- assertEquals("hdfs://foo/bar", getOutputDirectory("hdfs://foo/bar", "/part-SSSSS"));
- }
-
- @Test
- public void testGetOutputFilePrefix() {
- assertEquals("foo", getOutputFilePrefix("foo", "-S-of-N"));
- assertEquals("bar", getOutputFilePrefix("foo/bar", "-S-of-N"));
- assertEquals("bar", getOutputFilePrefix("/foo/bar", "-S-of-N"));
- assertEquals("bar", getOutputFilePrefix("hdfs://foo/bar", "-S-of-N"));
- assertEquals("", getOutputFilePrefix("foo/bar", "/part-SSSSS"));
- assertEquals("", getOutputFilePrefix("/foo/bar", "/part-SSSSS"));
- assertEquals("", getOutputFilePrefix("hdfs://foo/bar", "/part-SSSSS"));
- }
-
- @Test
- public void testGetOutputFileTemplate() {
- assertEquals("-S-of-N", getOutputFileTemplate("foo", "-S-of-N"));
- assertEquals("-S-of-N", getOutputFileTemplate("foo/bar", "-S-of-N"));
- assertEquals("-S-of-N", getOutputFileTemplate("/foo/bar", "-S-of-N"));
- assertEquals("-S-of-N", getOutputFileTemplate("hdfs://foo/bar", "-S-of-N"));
- assertEquals("part-SSSSS", getOutputFileTemplate("foo/bar", "/part-SSSSS"));
- assertEquals("part-SSSSS", getOutputFileTemplate("/foo/bar", "/part-SSSSS"));
- assertEquals("part-SSSSS", getOutputFileTemplate("hdfs://foo/bar", "/part-SSSSS"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6f7defdb/runners/spark/src/test/resources/test_text.txt
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/resources/test_text.txt b/runners/spark/src/test/resources/test_text.txt
deleted file mode 100644
index 6a14a1b..0000000
--- a/runners/spark/src/test/resources/test_text.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-test line 1
-test line 2
\ No newline at end of file
[2/2] beam git commit: This closes #2419
Posted by ie...@apache.org.
This closes #2419
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/45f63eb6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/45f63eb6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/45f63eb6
Branch: refs/heads/master
Commit: 45f63eb6c4d763f2ff8a3ff3e41df32aa00d1d2b
Parents: cc8e0b9 6f7defd
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 4 14:03:55 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 4 14:03:55 2017 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 19 --
.../beam/runners/spark/io/hadoop/HadoopIO.java | 216 ----------------
.../spark/io/hadoop/ShardNameBuilder.java | 111 --------
.../spark/io/hadoop/ShardNameTemplateAware.java | 31 ---
.../io/hadoop/ShardNameTemplateHelper.java | 63 -----
.../io/hadoop/TemplatedAvroKeyOutputFormat.java | 45 ----
.../TemplatedSequenceFileOutputFormat.java | 45 ----
.../io/hadoop/TemplatedTextOutputFormat.java | 45 ----
.../runners/spark/io/hadoop/package-info.java | 22 --
.../spark/translation/TransformTranslator.java | 251 -------------------
.../io/hadoop/HadoopFileFormatPipelineTest.java | 121 ---------
.../spark/io/hadoop/ShardNameBuilderTest.java | 88 -------
runners/spark/src/test/resources/test_text.txt | 2 -
13 files changed, 1059 deletions(-)
----------------------------------------------------------------------