You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/03/09 20:33:43 UTC

incubator-atlas git commit: ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table. ( shwethgs via sumasai)

Repository: incubator-atlas
Updated Branches:
  refs/heads/master 0defc6e80 -> 161079155


ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table. ( shwethgs via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/16107915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/16107915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/16107915

Branch: refs/heads/master
Commit: 161079155c403b86f4318e683ae46356017c7624
Parents: 0defc6e
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Wed Mar 9 11:33:18 2016 -0800
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Wed Mar 9 11:33:18 2016 -0800

----------------------------------------------------------------------
 .../apache/atlas/falcon/hook/FalconHook.java    | 12 ++-
 .../apache/atlas/falcon/hook/FalconHookIT.java  | 81 ++++++++++++++++----
 .../src/test/resources/feed-hdfs.xml            | 39 ++++++++++
 release-log.txt                                 |  1 +
 4 files changed, 113 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 05765bb..47fa714 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -235,8 +235,10 @@ public class FalconHook extends FalconEventPublisher {
                 if (process.getInputs() != null) {
                     for (Input input : process.getInputs().getInputs()) {
                         List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
-                        entities.addAll(clusterInputs);
-                        inputs.add(clusterInputs.get(clusterInputs.size() -1 ));
+                        if (clusterInputs != null) {
+                            entities.addAll(clusterInputs);
+                            inputs.add(clusterInputs.get(clusterInputs.size() - 1));
+                        }
                     }
                 }
 
@@ -244,8 +246,10 @@ public class FalconHook extends FalconEventPublisher {
                 if (process.getOutputs() != null) {
                     for (Output output : process.getOutputs().getOutputs()) {
                         List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
-                        entities.addAll(clusterOutputs);
-                        outputs.add(clusterOutputs.get(clusterOutputs.size() -1 ));
+                        if (clusterOutputs != null) {
+                            entities.addAll(clusterOutputs);
+                            outputs.add(clusterOutputs.get(clusterOutputs.size() - 1));
+                        }
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index 12b7a8b..3881bd6 100644
--- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -43,12 +43,15 @@ import javax.xml.bind.JAXBException;
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 public class FalconHookIT {
     public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
 
     public static final String CLUSTER_RESOURCE = "/cluster.xml";
     public static final String FEED_RESOURCE = "/feed.xml";
+    public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
     public static final String PROCESS_RESOURCE = "/process.xml";
 
     private AtlasClient dgiCLient;
@@ -96,21 +99,13 @@ public class FalconHookIT {
         Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
         STORE.publish(EntityType.CLUSTER, cluster);
 
-        Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random());
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
-        feedCluster.setName(cluster.getName());
-        String inTableName = "table" + random();
-        String inDbName = "db" + random();
-        feedCluster.getTable().setUri(getTableUri(inDbName, inTableName));
-        STORE.publish(EntityType.FEED, infeed);
+        Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+        String inTableName = getTableName(infeed);
+        String inDbName = getDBName(infeed);
 
-        Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random());
-        feedCluster = outfeed.getClusters().getClusters().get(0);
-        feedCluster.setName(cluster.getName());
-        String outTableName = "table" + random();
-        String outDbName = "db" + random();
-        feedCluster.getTable().setUri(getTableUri(outDbName, outTableName));
-        STORE.publish(EntityType.FEED, outfeed);
+        Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+        String outTableName = getTableName(outfeed);
+        String outDbName = getDBName(outfeed);
 
         Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
         process.getClusters().getClusters().get(0).setName(cluster.getName());
@@ -120,6 +115,7 @@ public class FalconHookIT {
 
         String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
         Referenceable processEntity = dgiCLient.getEntity(pid);
+        assertNotNull(processEntity);
         assertEquals(processEntity.get("processName"), process.getName());
 
         Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
@@ -133,7 +129,60 @@ public class FalconHookIT {
                 HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
     }
 
-//    @Test (enabled = true, dependsOnMethods = "testCreateProcess")
+    private Feed getTableFeed(String feedResource, String clusterName) throws Exception {
+        Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+        feedCluster.setName(clusterName);
+        feedCluster.getTable().setUri(getTableUri("db" + random(), "table" + random()));
+        STORE.publish(EntityType.FEED, feed);
+        return feed;
+    }
+
+    private String getDBName(Feed feed) {
+        String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
+        String[] parts = uri.split(":");
+        return parts[1];
+    }
+
+    private String getTableName(Feed feed) {
+        String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
+        String[] parts = uri.split(":");
+        parts = parts[2].split("#");
+        return parts[0];
+    }
+
+    @Test (enabled = true)
+    public void testCreateProcessWithHDFSFeed() throws Exception {
+        Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
+        STORE.publish(EntityType.CLUSTER, cluster);
+
+        Feed infeed = loadEntity(EntityType.FEED, FEED_HDFS_RESOURCE, "feed" + random());
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
+        feedCluster.setName(cluster.getName());
+        STORE.publish(EntityType.FEED, infeed);
+
+        Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+        String outTableName = getTableName(outfeed);
+        String outDbName = getDBName(outfeed);
+
+        Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
+        process.getClusters().getClusters().get(0).setName(cluster.getName());
+        process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+        process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+        STORE.publish(EntityType.PROCESS, process);
+
+        String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
+        Referenceable processEntity = dgiCLient.getEntity(pid);
+        assertEquals(processEntity.get("processName"), process.getName());
+        assertNull(processEntity.get("inputs"));
+
+        Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
+        Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+        assertEquals(outEntity.get("name"),
+                HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
+    }
+
+    //    @Test (enabled = true, dependsOnMethods = "testCreateProcess")
 //    public void testUpdateProcess() throws Exception {
 //        FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT);
 //        FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
@@ -156,7 +205,7 @@ public class FalconHookIT {
     }
 
     private String assertEntityIsRegistered(final String query) throws Exception {
-        waitFor(20000, new Predicate() {
+        waitFor(2000000, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = dgiCLient.search(query);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
new file mode 100644
index 0000000..435db07
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(3)"/>
+
+    <clusters>
+        <cluster name="testcluster" type="source">
+            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
+            <retention limit="hours(24)" action="delete"/>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
+    </locations>
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="hcat" provider="hcat"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82ce633..9330850 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -11,6 +11,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table ( shwethags via sumasai)
 ATLAS-476 Update type attribute with Reserved characters updated the original type as unknown (yhemanth via shwethags)
 ATLAS-463 Disconnect inverse references ( dkantor via sumasai)
 ATLAS-479 Add description for different types during create time (guptaneeru via shwethags)