You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/03/09 20:33:43 UTC
incubator-atlas git commit: ATLAS-537 Falcon hook failing when tried
to submit a process which creates a hive table. ( shwethgs via sumasai)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 0defc6e80 -> 161079155
ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table. ( shwethgs via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/16107915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/16107915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/16107915
Branch: refs/heads/master
Commit: 161079155c403b86f4318e683ae46356017c7624
Parents: 0defc6e
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Wed Mar 9 11:33:18 2016 -0800
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Wed Mar 9 11:33:18 2016 -0800
----------------------------------------------------------------------
.../apache/atlas/falcon/hook/FalconHook.java | 12 ++-
.../apache/atlas/falcon/hook/FalconHookIT.java | 81 ++++++++++++++++----
.../src/test/resources/feed-hdfs.xml | 39 ++++++++++
release-log.txt | 1 +
4 files changed, 113 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 05765bb..47fa714 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -235,8 +235,10 @@ public class FalconHook extends FalconEventPublisher {
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
- entities.addAll(clusterInputs);
- inputs.add(clusterInputs.get(clusterInputs.size() -1 ));
+ if (clusterInputs != null) {
+ entities.addAll(clusterInputs);
+ inputs.add(clusterInputs.get(clusterInputs.size() - 1));
+ }
}
}
@@ -244,8 +246,10 @@ public class FalconHook extends FalconEventPublisher {
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
- entities.addAll(clusterOutputs);
- outputs.add(clusterOutputs.get(clusterOutputs.size() -1 ));
+ if (clusterOutputs != null) {
+ entities.addAll(clusterOutputs);
+ outputs.add(clusterOutputs.get(clusterOutputs.size() - 1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index 12b7a8b..3881bd6 100644
--- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -43,12 +43,15 @@ import javax.xml.bind.JAXBException;
import java.util.List;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
public class FalconHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
+ public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private AtlasClient dgiCLient;
@@ -96,21 +99,13 @@ public class FalconHookIT {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
- Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random());
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
- feedCluster.setName(cluster.getName());
- String inTableName = "table" + random();
- String inDbName = "db" + random();
- feedCluster.getTable().setUri(getTableUri(inDbName, inTableName));
- STORE.publish(EntityType.FEED, infeed);
+ Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+ String inTableName = getTableName(infeed);
+ String inDbName = getDBName(infeed);
- Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random());
- feedCluster = outfeed.getClusters().getClusters().get(0);
- feedCluster.setName(cluster.getName());
- String outTableName = "table" + random();
- String outDbName = "db" + random();
- feedCluster.getTable().setUri(getTableUri(outDbName, outTableName));
- STORE.publish(EntityType.FEED, outfeed);
+ Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+ String outTableName = getTableName(outfeed);
+ String outDbName = getDBName(outfeed);
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
@@ -120,6 +115,7 @@ public class FalconHookIT {
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = dgiCLient.getEntity(pid);
+ assertNotNull(processEntity);
assertEquals(processEntity.get("processName"), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
@@ -133,7 +129,60 @@ public class FalconHookIT {
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
-// @Test (enabled = true, dependsOnMethods = "testCreateProcess")
+ private Feed getTableFeed(String feedResource, String clusterName) throws Exception {
+ Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+ feedCluster.setName(clusterName);
+ feedCluster.getTable().setUri(getTableUri("db" + random(), "table" + random()));
+ STORE.publish(EntityType.FEED, feed);
+ return feed;
+ }
+
+ private String getDBName(Feed feed) {
+ String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
+ String[] parts = uri.split(":");
+ return parts[1];
+ }
+
+ private String getTableName(Feed feed) {
+ String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
+ String[] parts = uri.split(":");
+ parts = parts[2].split("#");
+ return parts[0];
+ }
+
+ @Test (enabled = true)
+ public void testCreateProcessWithHDFSFeed() throws Exception {
+ Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
+ STORE.publish(EntityType.CLUSTER, cluster);
+
+ Feed infeed = loadEntity(EntityType.FEED, FEED_HDFS_RESOURCE, "feed" + random());
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
+ feedCluster.setName(cluster.getName());
+ STORE.publish(EntityType.FEED, infeed);
+
+ Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
+ String outTableName = getTableName(outfeed);
+ String outDbName = getDBName(outfeed);
+
+ Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
+ process.getClusters().getClusters().get(0).setName(cluster.getName());
+ process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+ process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+ STORE.publish(EntityType.PROCESS, process);
+
+ String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
+ Referenceable processEntity = dgiCLient.getEntity(pid);
+ assertEquals(processEntity.get("processName"), process.getName());
+ assertNull(processEntity.get("inputs"));
+
+ Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
+ Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+ assertEquals(outEntity.get("name"),
+ HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
+ }
+
+ // @Test (enabled = true, dependsOnMethods = "testCreateProcess")
// public void testUpdateProcess() throws Exception {
// FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT);
// FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
@@ -156,7 +205,7 @@ public class FalconHookIT {
}
private String assertEntityIsRegistered(final String query) throws Exception {
- waitFor(20000, new Predicate() {
+ waitFor(2000000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
new file mode 100644
index 0000000..435db07
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1">
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(3)"/>
+
+ <clusters>
+ <cluster name="testcluster" type="source">
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
+ <retention limit="hours(24)" action="delete"/>
+ </cluster>
+ </clusters>
+
+ <locations>
+ <location type="data" path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
+ </locations>
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="hcat" provider="hcat"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/16107915/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82ce633..9330850 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -11,6 +11,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
+ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table ( shwethags via sumasai)
ATLAS-476 Update type attribute with Reserved characters updated the original type as unknown (yhemanth via shwethags)
ATLAS-463 Disconnect inverse references ( dkantor via sumasai)
ATLAS-479 Add description for different types during create time (guptaneeru via shwethags)