You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/19 22:21:33 UTC

git commit: FALCON-598 ProcessHelper throws NPE if the process has no inputs OR no outputs defined. Contributed by Balu Vellanki

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 7827c39ea -> 85427981a


FALCON-598 ProcessHelper throws NPE if the process has no inputs OR no outputs defined. Contributed by Balu Vellanki


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/85427981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/85427981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/85427981

Branch: refs/heads/master
Commit: 85427981afeacef84ef3c10fc5fc4190cf291545
Parents: 7827c39
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Aug 19 13:21:30 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Aug 19 13:21:30 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../org/apache/falcon/resource/EntityList.java  | 16 ++--
 .../org/apache/falcon/entity/ProcessHelper.java | 14 ++--
 .../EntityRelationshipGraphBuilder.java         | 14 +++-
 .../metadata/MetadataMappingServiceTest.java    | 38 +++++++++-
 .../OozieProcessWorkflowBuilderTest.java        | 79 ++++++++++++++++++--
 .../config/process/process-no-inputs.xml        | 43 +++++++++++
 .../config/process/process-no-outputs.xml       | 43 +++++++++++
 .../falcon/rerun/handler/LateRerunHandler.java  | 12 +--
 9 files changed, 233 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 99ed6a6..ae8e03d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -61,6 +61,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-598 ProcessHelper throws NPE if the process has no inputs OR no
+   outputs defined (Balu Vellanki via Venkatesh Seetharam)
+
    FALCON-583 Post processing is broken in current trunk
    (Venkatesh Seetharam via Suhas Vasu)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index ae51172..2de177d 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -140,14 +140,18 @@ public class EntityList {
         }
 
         if (process != null) {
-            for (Input i : process.getInputs().getInputs()) {
-                if (i.getFeed().equals(entityNameToMatch)) {
-                    tagList.add("Input");
+            if (process.getInputs() != null) {
+                for (Input i : process.getInputs().getInputs()) {
+                    if (i.getFeed().equals(entityNameToMatch)) {
+                        tagList.add("Input");
+                    }
                 }
             }
-            for (Output o : process.getOutputs().getOutputs()) {
-                if (o.getFeed().equals(entityNameToMatch)) {
-                    tagList.add("Output");
+            if (process.getOutputs() != null) {
+                for (Output o : process.getOutputs().getOutputs()) {
+                    if (o.getFeed().equals(entityNameToMatch)) {
+                        tagList.add("Output");
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 59361e8..8e0c87a 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -59,16 +59,18 @@ public final class ProcessHelper {
             return storageType;
         }
 
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            storageType = FeedHelper.getStorageType(feed, cluster);
-            if (Storage.TYPE.TABLE == storageType) {
-                break;
+        if (process.getInputs() != null) {
+            for (Input input : process.getInputs().getInputs()) {
+                Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+                storageType = FeedHelper.getStorageType(feed, cluster);
+                if (Storage.TYPE.TABLE == storageType) {
+                    break;
+                }
             }
         }
 
         // If input feeds storage type is file system check storage type of output feeds
-        if (Storage.TYPE.FILESYSTEM == storageType) {
+        if (process.getOutputs() != null && Storage.TYPE.FILESYSTEM == storageType) {
             for (Output output : process.getOutputs().getOutputs()) {
                 Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
                 storageType = FeedHelper.getStorageType(feed, cluster);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 29448bf..1b7a068 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -358,8 +358,11 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) {
-        if (oldProcessInputs == null && newProcessInputs == null
-                || oldProcessInputs == null || newProcessInputs == null
+        if (oldProcessInputs == null && newProcessInputs == null) {
+            return true;
+        }
+
+        if (oldProcessInputs == null || newProcessInputs == null
                 || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) {
             return false;
         }
@@ -418,8 +421,11 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) {
-        if (oldProcessOutputs == null && newProcessOutputs == null
-                || oldProcessOutputs == null || newProcessOutputs == null
+        if (oldProcessOutputs == null && newProcessOutputs == null) {
+            return true;
+        }
+
+        if (oldProcessOutputs == null || newProcessOutputs == null
                 || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 87779d9..b51caf8 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -34,6 +34,10 @@ import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
@@ -261,7 +265,8 @@ public class MetadataMappingServiceTest {
         Feed oldFeed = inputFeeds.get(0);
         Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity,
                 "classified-as=Secured,source=data-warehouse", "reporting");
-        addStorage(newFeed, Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
+        addStorage(newFeed, Storage.TYPE.FILESYSTEM,
+                "jail://global:00/falcon/impression-feed/20140101");
 
         try {
             configStore.initiateUpdate(newFeed);
@@ -324,6 +329,37 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 69); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines
     }
 
+    @Test(dependsOnMethods = "testOnProcessEntityChange")
+    public void testAreSame() throws Exception {
+
+        Inputs inputs1 = new Inputs();
+        Inputs inputs2 = new Inputs();
+        Outputs outputs1 = new Outputs();
+        Outputs outputs2 = new Outputs();
+        // return true when both are null
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+
+        Input i1 = new Input();
+        i1.setName("input1");
+        Input i2 = new Input();
+        i2.setName("input2");
+        Output o1 = new Output();
+        o1.setName("output1");
+        Output o2 = new Output();
+        o2.setName("output2");
+
+        inputs1.getInputs().add(i1);
+        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+        outputs1.getOutputs().add(o1);
+        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+
+        inputs2.getInputs().add(i1);
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2));
+        outputs2.getOutputs().add(o1);
+        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2));
+    }
+
     private void verifyUpdatedEdges(Process newProcess) {
         Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 45badee..4daf5d8 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -461,7 +461,8 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         ACTION hiveNode = getAction(parentWorkflow, "user-action");
 
-        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(
+                hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
         Assert.assertEquals(hiveAction.getScript(),
@@ -572,14 +573,18 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
     private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed,
                                                       Process process) throws FalconException {
         Map<String, String> expected = new HashMap<String, String>();
-        for (Input input : process.getInputs().getInputs()) {
-            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
-            propagateStorageProperties(input.getName(), storage, expected);
+        if (process.getInputs() != null) {
+            for (Input input : process.getInputs().getInputs()) {
+                CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+                propagateStorageProperties(input.getName(), storage, expected);
+            }
         }
 
-        for (Output output : process.getOutputs().getOutputs()) {
-            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
-            propagateStorageProperties(output.getName(), storage, expected);
+        if (process.getOutputs() != null) {
+            for (Output output : process.getOutputs().getOutputs()) {
+                CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+                propagateStorageProperties(output.getName(), storage, expected);
+            }
         }
 
         return expected;
@@ -696,4 +701,64 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
             Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
         }
     }
+
+    @Test
+    public void testProcessWithInputsNoOutputs() throws Exception {
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+        URL resource = this.getClass().getResource("/config/process/process-no-outputs.xml");
+        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+        OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
+        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = getCoordProperties(coord);
+        verifyEntityProperties(processEntity, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
+
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "NONE");
+    }
+
+    @Test
+    public void testProcessNoInputsWithOutputs() throws Exception {
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+        URL resource = this.getClass().getResource("/config/process/process-no-inputs.xml");
+        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+        OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
+        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = getCoordProperties(coord);
+        verifyEntityProperties(processEntity, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, props);
+        verifyBrokerProperties(cluster, props);
+
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.FEED_NAMES.getName()), "impressions");
+        Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/oozie/src/test/resources/config/process/process-no-inputs.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-no-inputs.xml b/oozie/src/test/resources/config/process/process-no-inputs.xml
new file mode 100644
index 0000000..85d01db
--- /dev/null
+++ b/oozie/src/test/resources/config/process/process-no-inputs.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="process-no-inputs" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what = outputs only -->
+    <outputs>
+        <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/oozie/src/test/resources/config/process/process-no-outputs.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/process-no-outputs.xml b/oozie/src/test/resources/config/process/process-no-outputs.xml
new file mode 100644
index 0000000..0acba60
--- /dev/null
+++ b/oozie/src/test/resources/config/process/process-no-outputs.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="process-no-outputs" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what = inputs only -->
+    <inputs>
+        <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
+    </inputs>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/85427981/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 24c6ec2..bacc20f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -154,11 +154,13 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             for (LateInput lp : process.getLateProcess().getLateInputs()) {
                 Feed feed = null;
                 String endInstanceTime = "";
-                for (Input input : process.getInputs().getInputs()) {
-                    if (input.getName().equals(lp.getInput())) {
-                        endInstanceTime = input.getEnd();
-                        feed = store.get(EntityType.FEED, input.getFeed());
-                        break;
+                if (process.getInputs() != null) {
+                    for (Input input : process.getInputs().getInputs()) {
+                        if (input.getName().equals(lp.getInput())) {
+                            endInstanceTime = input.getEnd();
+                            feed = store.get(EntityType.FEED, input.getFeed());
+                            break;
+                        }
                     }
                 }
                 if (feed == null) {