You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/12/21 17:24:12 UTC
[beam] 04/05: Post code review amendments
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6bf325e5a9285aed86cc5c346208702020018e7b
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Dec 20 11:26:51 2017 -0800
Post code review amendments
---
.../beam/runners/core/construction/PipelineResources.java | 8 +++-----
.../runners/core/construction/PipelineResourcesTest.java | 12 ++++++------
.../main/java/org/apache/beam/runners/flink/FlinkRunner.java | 2 +-
.../org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +-
.../org/apache/beam/runners/spark/SparkPipelineOptions.java | 10 ----------
.../main/java/org/apache/beam/runners/spark/SparkRunner.java | 4 ++--
.../beam/runners/spark/translation/SparkContextFactory.java | 2 +-
7 files changed, 14 insertions(+), 26 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PipelineUtils.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
similarity index 93%
rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/PipelineUtils.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
index 1614dca..ae6b076 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PipelineUtils.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.util;
+package org.apache.beam.runners.core.construction;
import java.io.File;
import java.net.URISyntaxException;
@@ -24,10 +24,8 @@ import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
-/**
- * Utilities for working with Pipelines.
- */
-public class PipelineUtils {
+/** Utilities for working with classpath resources for pipelines. */
+public class PipelineResources {
/**
* Attempts to detect all the resources the class loader has access to. This does not recurse
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PipelineUtilsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
similarity index 88%
rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/PipelineUtilsTest.java
rename to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
index 4d03615..633df01 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PipelineUtilsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.util;
+package org.apache.beam.runners.core.construction;
import static org.junit.Assert.assertEquals;
@@ -32,10 +32,10 @@ import org.junit.runners.JUnit4;
import org.mockito.Mockito;
/**
- * Tests for PipelineUtils.
+ * Tests for PipelineResources.
*/
@RunWith(JUnit4.class)
-public class PipelineUtilsTest {
+public class PipelineResourcesTest {
@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -50,7 +50,7 @@ public class PipelineUtilsTest {
});
assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()),
- PipelineUtils.detectClassPathResourcesToStage(classLoader));
+ PipelineResources.detectClassPathResourcesToStage(classLoader));
}
@Test
@@ -59,7 +59,7 @@ public class PipelineUtilsTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Unable to use ClassLoader to detect classpath elements.");
- PipelineUtils.detectClassPathResourcesToStage(mockClassLoader);
+ PipelineResources.detectClassPathResourcesToStage(mockClassLoader);
}
@Test
@@ -71,6 +71,6 @@ public class PipelineUtilsTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Unable to convert url (" + url + ") to file.");
- PipelineUtils.detectClassPathResourcesToStage(classLoader);
+ PipelineResources.detectClassPathResourcesToStage(classLoader);
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index f2be9a7..5fdcdce 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.flink;
-import static org.apache.beam.sdk.util.PipelineUtils.detectClassPathResourcesToStage;
+import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import com.google.common.base.Joiner;
import java.util.ArrayList;
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 729ec9c..3684b47 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,8 +21,8 @@ import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-import static org.apache.beam.sdk.util.PipelineUtils.detectClassPathResourcesToStage;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 146f25b..2db8209 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.spark;
-import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
@@ -110,15 +109,6 @@ public interface SparkPipelineOptions
*/
@Description("Jar-Files to send to all workers and put on the classpath. "
+ "The default value is all files from the classpath.")
- @Default.InstanceFactory(EmptyPathList.class)
List<String> getFilesToStage();
void setFilesToStage(List<String> value);
-
- /** Returns an empty path list, to avoid handling null. */
- class EmptyPathList implements DefaultValueFactory<List<String>> {
- @Override
- public List<String> create(PipelineOptions options) {
- return new ArrayList<>();
- }
- }
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ccf8283..3495382 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.spark;
-import static org.apache.beam.sdk.util.PipelineUtils.detectClassPathResourcesToStage;
+import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import com.google.common.collect.Iterables;
import java.util.Arrays;
@@ -124,7 +124,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
SparkPipelineOptions sparkOptions =
PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
- if (sparkOptions.getFilesToStage().isEmpty()) {
+ if (sparkOptions.getFilesToStage() == null) {
sparkOptions.setFilesToStage(detectClassPathResourcesToStage(
SparkRunner.class.getClassLoader()));
LOG.info("PipelineOptions.filesToStage was not specified. "
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index d0b467a..5a8ad2d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -93,7 +93,7 @@ public final class SparkContextFactory {
conf.setMaster(contextOptions.getSparkMaster());
}
- if (contextOptions.getFilesToStage().size() > 0) {
+ if (contextOptions.getFilesToStage() != null && !contextOptions.getFilesToStage().isEmpty()) {
conf.setJars(contextOptions.getFilesToStage().toArray(new String[0]));
}
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.