You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/19 22:21:33 UTC
git commit: FALCON-598 ProcessHelper throws NPE if the process has no
inputs OR no outputs defined. Contributed by Balu Vellanki
Repository: incubator-falcon
Updated Branches:
refs/heads/master 7827c39ea -> 85427981a
FALCON-598 ProcessHelper throws NPE if the process has no inputs OR no outputs defined. Contributed by Balu Vellanki
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/85427981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/85427981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/85427981
Branch: refs/heads/master
Commit: 85427981afeacef84ef3c10fc5fc4190cf291545
Parents: 7827c39
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Aug 19 13:21:30 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Aug 19 13:21:30 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/falcon/resource/EntityList.java | 16 ++--
.../org/apache/falcon/entity/ProcessHelper.java | 14 ++--
.../EntityRelationshipGraphBuilder.java | 14 +++-
.../metadata/MetadataMappingServiceTest.java | 38 +++++++++-
.../OozieProcessWorkflowBuilderTest.java | 79 ++++++++++++++++++--
.../config/process/process-no-inputs.xml | 43 +++++++++++
.../config/process/process-no-outputs.xml | 43 +++++++++++
.../falcon/rerun/handler/LateRerunHandler.java | 12 +--
9 files changed, 233 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99ed6a6..ae8e03d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -61,6 +61,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-598 ProcessHelper throws NPE if the process has no inputs OR no
+ outputs defined (Balu Vellanki via Venkatesh Seetharam)
+
FALCON-583 Post processing is broken in current trunk
(Venkatesh Seetharam via Suhas Vasu)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index ae51172..2de177d 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -140,14 +140,18 @@ public class EntityList {
}
if (process != null) {
- for (Input i : process.getInputs().getInputs()) {
- if (i.getFeed().equals(entityNameToMatch)) {
- tagList.add("Input");
+ if (process.getInputs() != null) {
+ for (Input i : process.getInputs().getInputs()) {
+ if (i.getFeed().equals(entityNameToMatch)) {
+ tagList.add("Input");
+ }
}
}
- for (Output o : process.getOutputs().getOutputs()) {
- if (o.getFeed().equals(entityNameToMatch)) {
- tagList.add("Output");
+ if (process.getOutputs() != null) {
+ for (Output o : process.getOutputs().getOutputs()) {
+ if (o.getFeed().equals(entityNameToMatch)) {
+ tagList.add("Output");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 59361e8..8e0c87a 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -59,16 +59,18 @@ public final class ProcessHelper {
return storageType;
}
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- storageType = FeedHelper.getStorageType(feed, cluster);
- if (Storage.TYPE.TABLE == storageType) {
- break;
+ if (process.getInputs() != null) {
+ for (Input input : process.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ storageType = FeedHelper.getStorageType(feed, cluster);
+ if (Storage.TYPE.TABLE == storageType) {
+ break;
+ }
}
}
// If input feeds storage type is file system check storage type of output feeds
- if (Storage.TYPE.FILESYSTEM == storageType) {
+ if (process.getOutputs() != null && Storage.TYPE.FILESYSTEM == storageType) {
for (Output output : process.getOutputs().getOutputs()) {
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
storageType = FeedHelper.getStorageType(feed, cluster);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 29448bf..1b7a068 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -358,8 +358,11 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) {
- if (oldProcessInputs == null && newProcessInputs == null
- || oldProcessInputs == null || newProcessInputs == null
+ if (oldProcessInputs == null && newProcessInputs == null) {
+ return true;
+ }
+
+ if (oldProcessInputs == null || newProcessInputs == null
|| oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) {
return false;
}
@@ -418,8 +421,11 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) {
- if (oldProcessOutputs == null && newProcessOutputs == null
- || oldProcessOutputs == null || newProcessOutputs == null
+ if (oldProcessOutputs == null && newProcessOutputs == null) {
+ return true;
+ }
+
+ if (oldProcessOutputs == null || newProcessOutputs == null
|| oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 87779d9..b51caf8 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -34,6 +34,10 @@ import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
@@ -261,7 +265,8 @@ public class MetadataMappingServiceTest {
Feed oldFeed = inputFeeds.get(0);
Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity,
"classified-as=Secured,source=data-warehouse", "reporting");
- addStorage(newFeed, Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
+ addStorage(newFeed, Storage.TYPE.FILESYSTEM,
+ "jail://global:00/falcon/impression-feed/20140101");
try {
configStore.initiateUpdate(newFeed);
@@ -324,6 +329,37 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
}
+ @Test(dependsOnMethods = "testOnProcessEntityChange")
+ public void testAreSame() throws Exception {
+
+ Inputs inputs1 = new Inputs();
+ Inputs inputs2 = new Inputs();
+ Outputs outputs1 = new Outputs();
+ Outputs outputs2 = new Outputs();
+ // return true when both are null
+ Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+ Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+
+ Input i1 = new Input();
+ i1.setName("input1");
+ Input i2 = new Input();
+ i2.setName("input2");
+ Output o1 = new Output();
+ o1.setName("output1");
+ Output o2 = new Output();
+ o2.setName("output2");
+
+ inputs1.getInputs().add(i1);
+ Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+ outputs1.getOutputs().add(o1);
+ Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+
+ inputs2.getInputs().add(i1);
+ Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+ outputs2.getOutputs().add(o1);
+ Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+ }
+
private void verifyUpdatedEdges(Process newProcess) {
Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 45badee..4daf5d8 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -461,7 +461,8 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
ACTION hiveNode = getAction(parentWorkflow, "user-action");
- JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(
+ hiveNode);
org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
Assert.assertEquals(hiveAction.getScript(),
@@ -572,14 +573,18 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed,
Process process) throws FalconException {
Map<String, String> expected = new HashMap<String, String>();
- for (Input input : process.getInputs().getInputs()) {
- CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
- propagateStorageProperties(input.getName(), storage, expected);
+ if (process.getInputs() != null) {
+ for (Input input : process.getInputs().getInputs()) {
+ CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+ propagateStorageProperties(input.getName(), storage, expected);
+ }
}
- for (Output output : process.getOutputs().getOutputs()) {
- CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
- propagateStorageProperties(output.getName(), storage, expected);
+ if (process.getOutputs() != null) {
+ for (Output output : process.getOutputs().getOutputs()) {
+ CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+ propagateStorageProperties(output.getName(), storage, expected);
+ }
}
return expected;
@@ -696,4 +701,64 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
}
}
+
+ @Test
+ public void testProcessWithInputsNoOutputs() throws Exception {
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+ URL resource = this.getClass().getResource("/config/process/process-no-outputs.xml");
+ Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+ OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
+ Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+ builder.build(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(fs, bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+ HashMap<String, String> props = getCoordProperties(coord);
+ verifyEntityProperties(processEntity, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
+
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "NONE");
+ }
+
+ @Test
+ public void testProcessNoInputsWithOutputs() throws Exception {
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+ URL resource = this.getClass().getResource("/config/process/process-no-inputs.xml");
+ Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+ OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
+ Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+ builder.build(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(fs, bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+ HashMap<String, String> props = getCoordProperties(coord);
+ verifyEntityProperties(processEntity, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
+
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "impressions");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/oozie/src/test/resources/config/process/process-no-inputs.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-no-inputs.xml b/oozie/src/test/resources/config/process/process-no-inputs.xml
new file mode 100644
index 0000000..85d01db
--- /dev/null
+++ b/oozie/src/test/resources/config/process/process-no-inputs.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="process-no-inputs" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what = outputs only -->
+ <outputs>
+ <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
+ </outputs>
+
+ <!-- how -->
+ <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+ <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/oozie/src/test/resources/config/process/process-no-outputs.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-no-outputs.xml b/oozie/src/test/resources/config/process/process-no-outputs.xml
new file mode 100644
index 0000000..0acba60
--- /dev/null
+++ b/oozie/src/test/resources/config/process/process-no-outputs.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="process-no-outputs" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what = inputs only -->
+ <inputs>
+ <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
+ </inputs>
+
+ <!-- how -->
+ <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+ <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 24c6ec2..bacc20f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -154,11 +154,13 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
for (LateInput lp : process.getLateProcess().getLateInputs()) {
Feed feed = null;
String endInstanceTime = "";
- for (Input input : process.getInputs().getInputs()) {
- if (input.getName().equals(lp.getInput())) {
- endInstanceTime = input.getEnd();
- feed = store.get(EntityType.FEED, input.getFeed());
- break;
+ if (process.getInputs() != null) {
+ for (Input input : process.getInputs().getInputs()) {
+ if (input.getName().equals(lp.getInput())) {
+ endInstanceTime = input.getEnd();
+ feed = store.get(EntityType.FEED, input.getFeed());
+ break;
+ }
}
}
if (feed == null) {