You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sh...@apache.org on 2015/12/23 10:11:12 UTC
[2/2] incubator-atlas git commit: ATLAS-379 Create sqoop and falcon
metadata addons (venkatnrangan, bvellanki, sowmyaramesh via shwethags)
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/086b4a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/086b4a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/086b4a3e
Branch: refs/heads/master
Commit: 086b4a3ee0480d37f9aa66fb878c9dc978aaa043
Parents: 70d5498
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Wed Dec 23 14:40:49 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Wed Dec 23 14:41:00 2015 +0530
----------------------------------------------------------------------
addons/falcon-bridge/pom.xml | 274 ++++++++++++++
.../apache/atlas/falcon/hook/FalconHook.java | 356 ++++++++++++++++++
.../falcon/model/FalconDataModelGenerator.java | 153 ++++++++
.../atlas/falcon/model/FalconDataTypes.java | 40 +++
.../org/apache/falcon/atlas/Util/EventUtil.java | 68 ++++
.../apache/falcon/atlas/event/FalconEvent.java | 65 ++++
.../atlas/publisher/FalconEventPublisher.java | 41 +++
.../falcon/atlas/service/AtlasService.java | 115 ++++++
.../apache/atlas/falcon/hook/FalconHookIT.java | 205 +++++++++++
.../src/test/resources/cluster.xml | 45 +++
.../falcon-bridge/src/test/resources/feed.xml | 38 ++
.../src/test/resources/hive-site.xml | 53 +++
.../src/test/resources/process.xml | 53 +++
.../src/test/resources/startup.properties | 20 ++
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 9 +-
.../hive/model/HiveDataModelGenerator.java | 2 +-
addons/sqoop-bridge/pom.xml | 357 +++++++++++++++++++
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 230 ++++++++++++
.../sqoop/model/SqoopDataModelGenerator.java | 180 ++++++++++
.../atlas/sqoop/model/SqoopDataTypes.java | 34 ++
.../apache/atlas/sqoop/hook/SqoopHookIT.java | 124 +++++++
.../src/test/resources/hive-site.xml | 53 +++
.../src/test/resources/sqoop-site.xml | 190 ++++++++++
distro/src/bin/atlas_start.py | 2 +-
.../src/main/assemblies/standalone-package.xml | 12 +
distro/src/test/python/scripts/TestMetadata.py | 4 +-
docs/src/site/twiki/Architecture.twiki | 3 +-
docs/src/site/twiki/Bridge-Falcon.twiki | 34 ++
docs/src/site/twiki/Bridge-Sqoop.twiki | 37 ++
docs/src/site/twiki/index.twiki | 2 +
.../notification/hook/HookNotification.java | 7 +-
pom.xml | 20 ++
release-log.txt | 1 +
.../main/resources/atlas-application.properties | 2 +-
34 files changed, 2818 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml
new file mode 100644
index 0000000..73ef265
--- /dev/null
+++ b/addons/falcon-bridge/pom.xml
@@ -0,0 +1,274 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-atlas</artifactId>
+ <groupId>org.apache.atlas</groupId>
+ <version>0.7-incubating-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+ <artifactId>falcon-bridge</artifactId>
+ <description>Apache Atlas Falcon Bridge Module</description>
+ <name>Apache Atlas Falcon Bridge</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <falcon.version>0.8</falcon.version>
+ </properties>
+
+ <dependencies>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-typesystem</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-notification</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-common</artifactId>
+ <version>${falcon.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>hive-bridge</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-hook-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/dependency/hook/falcon</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>${project.artifactId}</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>hive-bridge</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-native_2.10</artifactId>
+ <version>${json.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-core_2.10</artifactId>
+ <version>${json.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-ast_2.10</artifactId>
+ <version>${json.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-client</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-typesystem</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-notification</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>atlas-common</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ <version>${scala.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-multibindings</artifactId>
+ <version>${guice.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-maven-plugin</artifactId>
+ <configuration>
+ <!--<skip>${skipTests}</skip>-->
+ <!--only skip int tests -->
+ <httpConnector>
+ <port>31000</port>
+ <idleTimeout>60000</idleTimeout>
+ </httpConnector>
+ <war>../../webapp/target/atlas-webapp-${project.version}.war</war>
+ <daemon>true</daemon>
+ <webApp>
+ <contextPath>/</contextPath>
+ <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor>
+ </webApp>
+ <useTestScope>true</useTestScope>
+ <systemProperties>
+ <systemProperty>
+ <name>log4j.configuration</name>
+ <value>atlas-log4j.xml</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.log.dir</name>
+ <value>${project.build.directory}/logs</value>
+ </systemProperty>
+ <systemProperty>
+ <name>atlas.data</name>
+ <value>${project.build.directory}/data</value>
+ </systemProperty>
+ </systemProperties>
+ <stopKey>atlas-stop</stopKey>
+ <stopPort>31001</stopPort>
+ </configuration>
+ <executions>
+ <execution>
+ <id>start-jetty</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>deploy-war</goal>
+ </goals>
+ <configuration>
+ <daemon>true</daemon>
+ </configuration>
+ </execution>
+ <execution>
+ <id>stop-jetty</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-module-twiki</artifactId>
+ <version>1.3</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <goals>
+ <goal>site</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <generateProjectInfo>false</generateProjectInfo>
+ <generateReports>false</generateReports>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
new file mode 100644
index 0000000..05765bb
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.hook;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.falcon.model.FalconDataModelGenerator;
+import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.NotificationModule;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.atlas.Util.EventUtil;
+import org.apache.falcon.atlas.event.FalconEvent;
+import org.apache.falcon.atlas.publisher.FalconEventPublisher;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Falcon hook sends lineage information to the Atlas Service.
+ */
+public class FalconHook extends FalconEventPublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class);
+
+ public static final String CONF_PREFIX = "atlas.hook.falcon.";
+ private static final String MIN_THREADS = CONF_PREFIX + "minThreads";
+ private static final String MAX_THREADS = CONF_PREFIX + "maxThreads";
+ private static final String KEEP_ALIVE_TIME = CONF_PREFIX + "keepAliveTime";
+ public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
+ public static final String CONF_SYNC = CONF_PREFIX + "synchronous";
+
+ public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
+
+ public static final String ATLAS_ENDPOINT = "atlas.rest.address";
+
+ private static AtlasClient atlasClient;
+
+ // wait time determines how long we wait before we exit the jvm on
+ // shutdown. Pending requests after that will not be sent.
+ private static final int WAIT_TIME = 3;
+ private static ExecutorService executor;
+
+ private static final int minThreadsDefault = 5;
+ private static final int maxThreadsDefault = 5;
+ private static final long keepAliveTimeDefault = 10;
+ private static final int queueSizeDefault = 10000;
+
+ private static Configuration atlasProperties;
+ @Inject
+ private static NotificationInterface notifInterface;
+
+ public static boolean typesRegistered = false;
+
+ private static boolean sync;
+
+ private static ConfigurationStore STORE;
+
+ static {
+ try {
+ atlasProperties = ApplicationProperties.get();
+
+ // initialize the async facility to process hook calls. We don't
+ // want to do this inline since it adds plenty of overhead for the query.
+ int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault);
+ int maxThreads = atlasProperties.getInt(MAX_THREADS, maxThreadsDefault);
+ long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, keepAliveTimeDefault);
+ int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault);
+ sync = atlasProperties.getBoolean(CONF_SYNC, false);
+
+ executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(queueSize),
+ new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+ executor = null;
+ } catch (InterruptedException ie) {
+ LOG.info("Interrupt received in shutdown.");
+ }
+ // shutdown client
+ }
+ });
+ atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT),
+ EventUtil.getUgi(), EventUtil.getUgi().getShortUserName());
+
+ STORE = ConfigurationStore.get();
+ } catch (Exception e) {
+ LOG.info("Caught exception initializing the falcon hook.", e);
+ }
+
+ Injector injector = Guice.createInjector(new NotificationModule());
+ notifInterface = injector.getInstance(NotificationInterface.class);
+
+ LOG.info("Created Atlas Hook for Falcon");
+ }
+
+ @Override
+ public void publish(final Data data) throws Exception {
+ final FalconEvent event = data.getEvent();
+ if (sync) {
+ fireAndForget(event);
+ } else {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fireAndForget(event);
+ } catch (Throwable e) {
+ LOG.info("Atlas hook failed", e);
+ }
+ }
+ });
+ }
+ }
+
+ private void fireAndForget(FalconEvent event) throws Exception {
+ LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
+
+ if (!typesRegistered) {
+ registerFalconDataModel();
+ typesRegistered = true;
+ }
+
+ notifyEntity(createEntities(event));
+ }
+
+ private List<Referenceable> createEntities(FalconEvent event) throws Exception {
+ switch (event.getOperation()) {
+ case ADD_PROCESS:
+ return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp());
+ }
+
+ return null;
+ }
+
+ /**
+ * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities.
+ * De-duping of entities is done on server side depending on the unique attribute on the
+ *
+ * @param entities entitiies to add
+ */
+ private void notifyEntity(List<Referenceable> entities) {
+ int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
+ String message = entities.toString();
+
+ int numRetries = 0;
+ while (true) {
+ try {
+ notifInterface.send(NotificationInterface.NotificationType.HOOK,
+ new HookNotification.EntityCreateRequest(entities));
+ return;
+ } catch (Exception e) {
+ numRetries++;
+ if (numRetries < maxRetries) {
+ LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
+ } else {
+ LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message,
+ maxRetries, e);
+ break;
+ }
+ }
+ }
+ }
+
+
+ /**
+ + * Creates process entity
+ + *
+ + * @param event process entity event
+ + * @return process instance reference
+ + */
+ public List<Referenceable> createProcessInstance(Process process, String user, long timestamp) throws Exception {
+ LOG.info("Creating process Instance : {}", process.getName());
+
+ // The requirement is for each cluster, create a process entity with name
+ // clustername.processname
+ List<Referenceable> entities = new ArrayList<>();
+
+ if (process.getClusters() != null) {
+
+ for (Cluster processCluster : process.getClusters().getClusters()) {
+ org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName());
+
+ List<Referenceable> inputs = new ArrayList<>();
+ if (process.getInputs() != null) {
+ for (Input input : process.getInputs().getInputs()) {
+ List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
+ entities.addAll(clusterInputs);
+ inputs.add(clusterInputs.get(clusterInputs.size() -1 ));
+ }
+ }
+
+ List<Referenceable> outputs = new ArrayList<>();
+ if (process.getOutputs() != null) {
+ for (Output output : process.getOutputs().getOutputs()) {
+ List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
+ entities.addAll(clusterOutputs);
+ outputs.add(clusterOutputs.get(clusterOutputs.size() -1 ));
+ }
+ }
+
+ if (!inputs.isEmpty() || !outputs.isEmpty()) {
+ Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
+ processEntity.set(FalconDataModelGenerator.NAME, String.format("%s@%s", process.getName(),
+ cluster.getName()));
+ processEntity.set(FalconDataModelGenerator.PROCESS_NAME, process.getName());
+
+ processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
+ if (!inputs.isEmpty()) {
+ processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
+ }
+ if (!outputs.isEmpty()) {
+ processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
+ }
+ processEntity.set(FalconDataModelGenerator.USER, user);
+
+ if (StringUtils.isNotEmpty(process.getTags())) {
+ processEntity.set(FalconDataModelGenerator.TAGS,
+ EventUtil.convertKeyValueStringToMap(process.getTags()));
+ }
+ entities.add(processEntity);
+ }
+
+ }
+ }
+
+ return entities;
+ }
+
+ private List<Referenceable> getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception {
+ Feed feed = STORE.get(EntityType.FEED, feedName);
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+
+ final CatalogTable table = getTable(feedCluster, feed);
+ if (table != null) {
+ CatalogStorage storage = new CatalogStorage(cluster, table);
+ return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(),
+ storage.getTable().toLowerCase());
+ }
+
+ return null;
+ }
+
+ private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
+ // check if table is overridden in cluster
+ if (cluster.getTable() != null) {
+ return cluster.getTable();
+ }
+
+ return feed.getTable();
+ }
+
+ private Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
+ throws Exception {
+ Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
+ dbRef.set(HiveDataModelGenerator.CLUSTER_NAME, clusterName);
+ dbRef.set(HiveDataModelGenerator.NAME, dbName);
+ dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
+ return dbRef;
+ }
+
+ private List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception {
+ List<Referenceable> entities = new ArrayList<>();
+ Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
+ entities.add(dbRef);
+
+ Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
+ tableRef.set(HiveDataModelGenerator.NAME,
+ HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
+ tableRef.set(HiveDataModelGenerator.TABLE_NAME, tableName);
+ tableRef.set(HiveDataModelGenerator.DB, dbRef);
+ entities.add(tableRef);
+
+ return entities;
+ }
+
+ public synchronized void registerFalconDataModel() throws Exception {
+ if (isDataModelAlreadyRegistered()) {
+ LOG.info("Falcon data model is already registered!");
+ return;
+ }
+
+ HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties,
+ UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser());
+ hiveMetaStoreBridge.registerHiveDataModel();
+
+ FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator();
+ LOG.info("Registering Falcon data model");
+ atlasClient.createType(dataModelGenerator.getModelAsJson());
+ }
+
+ private boolean isDataModelAlreadyRegistered() throws Exception {
+ try {
+ atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
+ LOG.info("Hive data model is already registered!");
+ return true;
+ } catch(AtlasServiceException ase) {
+ if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ return false;
+ }
+ throw ase;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
new file mode 100644
index 0000000..ac9dd85
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.model;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.TypesSerialization;
+import org.apache.atlas.typesystem.types.AttributeDefinition;
+import org.apache.atlas.typesystem.types.ClassType;
+import org.apache.atlas.typesystem.types.DataTypes;
+import org.apache.atlas.typesystem.types.EnumType;
+import org.apache.atlas.typesystem.types.EnumTypeDefinition;
+import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
+import org.apache.atlas.typesystem.types.Multiplicity;
+import org.apache.atlas.typesystem.types.StructType;
+import org.apache.atlas.typesystem.types.StructTypeDefinition;
+import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility that generates falcon data model.
+ */
+public class FalconDataModelGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class);
+
+ private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
+ private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap;
+ private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
+
+ public static final String NAME = "name";
+ public static final String PROCESS_NAME = "processName";
+ public static final String TIMESTAMP = "timestamp";
+ public static final String USER = "owned-by";
+ public static final String TAGS = "tag-classification";
+
+ // multiple inputs and outputs for process
+ public static final String INPUTS = "inputs";
+ public static final String OUTPUTS = "outputs";
+
+
+ public FalconDataModelGenerator() {
+ classTypeDefinitions = new HashMap<>();
+ enumTypeDefinitionMap = new HashMap<>();
+ structTypeDefinitionMap = new HashMap<>();
+ }
+
+ public void createDataModel() throws AtlasException {
+ LOG.info("Generating the Falcon Data Model");
+ createProcessEntityClass();
+
+ }
+
+ private TypesDef getTypesDef() {
+ return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(),
+ getClassTypeDefinitions());
+ }
+
+ public String getDataModelAsJSON() {
+ return TypesSerialization.toJson(getTypesDef());
+ }
+
+ private ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() {
+ return ImmutableList.copyOf(enumTypeDefinitionMap.values());
+ }
+
+ private ImmutableList<StructTypeDefinition> getStructTypeDefinitions() {
+ return ImmutableList.copyOf(structTypeDefinitionMap.values());
+ }
+
+ private ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() {
+ return ImmutableList.copyOf(classTypeDefinitions.values());
+ }
+
+ private ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() {
+ return ImmutableList.of();
+ }
+
+
+ private void createProcessEntityClass() throws AtlasException {
+ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+ new AttributeDefinition(PROCESS_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+ null),
+ new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
+ null),
+ new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+ null),
+ // map of tags
+ new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
+ Multiplicity.OPTIONAL, false, null),};
+
+ HierarchicalTypeDefinition<ClassType> definition =
+ new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(),
+ ImmutableList.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
+ classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition);
+ LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
+ }
+
+
+
+ public String getModelAsJson() throws AtlasException {
+ createDataModel();
+ return getDataModelAsJSON();
+ }
+
+ public static void main(String[] args) throws Exception {
+ FalconDataModelGenerator falconDataModelGenerator = new FalconDataModelGenerator();
+ System.out.println("falconDataModelAsJSON = " + falconDataModelGenerator.getModelAsJson());
+
+ TypesDef typesDef = falconDataModelGenerator.getTypesDef();
+ for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) {
+ System.out.println(String.format("%s(%s) - values %s", enumType.name, EnumType.class.getSimpleName(),
+ Arrays.toString(enumType.enumValues)));
+ }
+ for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) {
+ System.out.println(String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(),
+ Arrays.toString(structType.attributeDefinitions)));
+ }
+ for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) {
+ System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, ClassType.class.getSimpleName(),
+ StringUtils.join(classType.superTypes, ","), Arrays.toString(classType.attributeDefinitions)));
+ }
+ for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) {
+ System.out.println(String.format("%s(%s) - %s", traitType.typeName, TraitType.class.getSimpleName(),
+ Arrays.toString(traitType.attributeDefinitions)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
new file mode 100644
index 0000000..f1f350b
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.model;
+
+/**
+ * Falcon Data Types for model and bridge.
+ */
+public enum FalconDataTypes {
+
+
+ FALCON_PROCESS_ENTITY("falcon_process"),
+ ;
+
+ private final String name;
+
+ FalconDataTypes(java.lang.String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java
new file mode 100644
index 0000000..7f67407
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/Util/EventUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.atlas.Util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Falcon event util
+ */
+public final class EventUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class);
+
+ private EventUtil() {}
+
+
+ public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) {
+ if (StringUtils.isBlank(keyValueString)) {
+ return null;
+ }
+
+ Map<String, String> keyValueMap = new HashMap<>();
+
+ String[] tags = keyValueString.split(",");
+ for (String tag : tags) {
+ int index = tag.indexOf("=");
+ String tagKey = tag.substring(0, index);
+ String tagValue = tag.substring(index + 1, tag.length());
+ keyValueMap.put(tagKey, tagValue);
+ }
+ return keyValueMap;
+ }
+
+
+ public static UserGroupInformation getUgi() throws FalconException {
+ UserGroupInformation ugi;
+ try {
+ ugi = CurrentUser.getAuthenticatedUGI();
+ } catch (IOException ioe) {
+ throw new FalconException(ioe);
+ }
+ return ugi;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java
new file mode 100644
index 0000000..e587e73
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/event/FalconEvent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.atlas.event;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Falcon event to interface with Atlas Service.
+ */
+public class FalconEvent {
+ protected String user;
+ protected UserGroupInformation ugi;
+ protected OPERATION operation;
+ protected long timestamp;
+ protected Entity entity;
+
+ public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) {
+ this.user = doAsUser;
+ this.ugi = ugi;
+ this.operation = falconOperation;
+ this.timestamp = timestamp;
+ this.entity = entity;
+ }
+
+ public enum OPERATION {
+ ADD_PROCESS, UPDATE_PROCESS
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public UserGroupInformation getUgi() {
+ return ugi;
+ }
+
+ public OPERATION getOperation() {
+ return operation;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
new file mode 100644
index 0000000..3522339
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.atlas.publisher;
+
+
+import org.apache.falcon.atlas.event.FalconEvent;
+
+/**
+ * Falcon publisher for Atlas
+ */
+public abstract class FalconEventPublisher {
+ public static class Data {
+ private FalconEvent event;
+
+ public Data(FalconEvent event) {
+ this.event = event;
+ }
+
+ public FalconEvent getEvent() {
+ return event;
+ }
+ }
+
+ public abstract void publish(final Data data) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java
new file mode 100644
index 0000000..373846d
--- /dev/null
+++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/service/AtlasService.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.atlas.service;
+
+import org.apache.atlas.falcon.hook.FalconHook;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.atlas.Util.EventUtil;
+import org.apache.falcon.atlas.event.FalconEvent;
+import org.apache.falcon.atlas.publisher.FalconEventPublisher;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.service.ConfigurationChangeListener;
+import org.apache.falcon.service.FalconService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Atlas service to publish Falcon events
+ */
+public class AtlasService implements FalconService, ConfigurationChangeListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class);
+ private FalconEventPublisher publisher;
+
+ /**
+ * Constant for the service name.
+ */
+ public static final String SERVICE_NAME = AtlasService.class.getSimpleName();
+
+ @Override
+ public String getName() {
+ return SERVICE_NAME;
+ }
+
+ @Override
+ public void init() throws FalconException {
+ ConfigurationStore.get().registerListener(this);
+ publisher = new FalconHook();
+ }
+
+
+ @Override
+ public void destroy() throws FalconException {
+ ConfigurationStore.get().unregisterListener(this);
+ }
+
+ @Override
+ public void onAdd(Entity entity) throws FalconException {
+ EntityType entityType = entity.getEntityType();
+ switch (entityType) {
+ case PROCESS:
+ addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS);
+ break;
+
+ default:
+ LOG.debug("Entity type not processed " + entityType);
+ }
+ }
+
+ @Override
+ public void onRemove(Entity entity) throws FalconException {
+ }
+
+ @Override
+ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+ EntityType entityType = newEntity.getEntityType();
+ switch (entityType) {
+ case PROCESS:
+ addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
+ break;
+
+ default:
+ LOG.debug("Entity type not processed " + entityType);
+ }
+ }
+
+ @Override
+ public void onReload(Entity entity) throws FalconException {
+ //Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start
+ onAdd(entity);
+ }
+
+ private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException {
+ LOG.info("Adding process entity to Atlas: {}", entity.getName());
+
+ try {
+ String user = entity.getACL() != null ? entity.getACL().getOwner() :
+ UserGroupInformation.getLoginUser().getShortUserName();
+ FalconEvent event = new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
+ FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
+ publisher.publish(data);
+ } catch (Exception ex) {
+ throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
new file mode 100644
index 0000000..12b7a8b
--- /dev/null
+++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.falcon.hook;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.falcon.model.FalconDataTypes;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.falcon.atlas.service.AtlasService;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+public class FalconHookIT {
+ public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
+
+ public static final String CLUSTER_RESOURCE = "/cluster.xml";
+ public static final String FEED_RESOURCE = "/feed.xml";
+ public static final String PROCESS_RESOURCE = "/process.xml";
+
+ private AtlasClient dgiCLient;
+
+ private static final ConfigurationStore STORE = ConfigurationStore.get();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address"));
+
+ AtlasService service = new AtlasService();
+ service.init();
+ STORE.registerListener(service);
+ CurrentUser.authenticate(System.getProperty("user.name"));
+ }
+
+ private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
+ Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
+ switch (entity.getEntityType()) {
+ case CLUSTER:
+ ((Cluster) entity).setName(name);
+ break;
+
+ case FEED:
+ ((Feed) entity).setName(name);
+ break;
+
+ case PROCESS:
+ ((org.apache.falcon.entity.v0.process.Process) entity).setName(name);
+ break;
+ }
+ return (T)entity;
+ }
+
+ private String random() {
+ return RandomStringUtils.randomAlphanumeric(10);
+ }
+
+ private String getTableUri(String dbName, String tableName) {
+ return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName);
+ }
+
+ @Test (enabled = true)
+ public void testCreateProcess() throws Exception {
+ Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
+ STORE.publish(EntityType.CLUSTER, cluster);
+
+ Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random());
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
+ feedCluster.setName(cluster.getName());
+ String inTableName = "table" + random();
+ String inDbName = "db" + random();
+ feedCluster.getTable().setUri(getTableUri(inDbName, inTableName));
+ STORE.publish(EntityType.FEED, infeed);
+
+ Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random());
+ feedCluster = outfeed.getClusters().getClusters().get(0);
+ feedCluster.setName(cluster.getName());
+ String outTableName = "table" + random();
+ String outDbName = "db" + random();
+ feedCluster.getTable().setUri(getTableUri(outDbName, outTableName));
+ STORE.publish(EntityType.FEED, outfeed);
+
+ Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
+ process.getClusters().getClusters().get(0).setName(cluster.getName());
+ process.getInputs().getInputs().get(0).setFeed(infeed.getName());
+ process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
+ STORE.publish(EntityType.PROCESS, process);
+
+ String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
+ Referenceable processEntity = dgiCLient.getEntity(pid);
+ assertEquals(processEntity.get("processName"), process.getName());
+
+ Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
+ Referenceable inEntity = dgiCLient.getEntity(inId._getId());
+ assertEquals(inEntity.get("name"),
+ HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName));
+
+ Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
+ Referenceable outEntity = dgiCLient.getEntity(outId._getId());
+ assertEquals(outEntity.get("name"),
+ HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
+ }
+
+// @Test (enabled = true, dependsOnMethods = "testCreateProcess")
+// public void testUpdateProcess() throws Exception {
+// FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT);
+// FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
+// hook.publish(data);
+// String id = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2);
+// event = createProcessEntity(PROCESS_NAME_2, INPUT_2, OUTPUT_2);
+// hook.publish(data);
+// String id2 = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2);
+// if (!id.equals(id2)) {
+// throw new Exception("Id mismatch");
+// }
+// }
+
+ private String assertProcessIsRegistered(String clusterName, String processName) throws Exception {
+ String name = processName + "@" + clusterName;
+ LOG.debug("Searching for process {}", name);
+ String query = String.format("%s as t where name = '%s' select t",
+ FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), name);
+ return assertEntityIsRegistered(query);
+ }
+
+ private String assertEntityIsRegistered(final String query) throws Exception {
+ waitFor(20000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ JSONArray results = dgiCLient.search(query);
+ System.out.println(results);
+ return results.length() == 1;
+ }
+ });
+
+ JSONArray results = dgiCLient.search(query);
+ JSONObject row = results.getJSONObject(0).getJSONObject("t");
+
+ return row.getString("id");
+ }
+
+
+ public interface Predicate {
+
+ /**
+ * Perform a predicate evaluation.
+ *
+ * @return the boolean result of the evaluation.
+ * @throws Exception thrown if the predicate evaluation could not evaluate.
+ */
+ boolean evaluate() throws Exception;
+ }
+
+ /**
+ * Wait for a condition, expressed via a {@link Predicate} to become true.
+ *
+ * @param timeout maximum time in milliseconds to wait for the predicate to become true.
+ * @param predicate predicate waiting on.
+ */
+ protected void waitFor(int timeout, Predicate predicate) throws Exception {
+ long mustEnd = System.currentTimeMillis() + timeout;
+
+ boolean eval;
+ while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
+ LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
+ Thread.sleep(1000);
+ }
+ if (!eval) {
+ throw new Exception("Waiting timed out after " + timeout + " msec");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/cluster.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/cluster.xml b/addons/falcon-bridge/src/test/resources/cluster.xml
new file mode 100644
index 0000000..b183847
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/cluster.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!--
+ Primary cluster configuration for demo vm
+ -->
+<cluster colo="west-coast" description="Primary Cluster" name="testcluster"
+ xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <interfaces>
+ <interface type="readonly" endpoint="hftp://localhost:10070" version="1.1.1" />
+
+ <interface type="write" endpoint="hdfs://localhost:10020" version="1.1.1" />
+
+ <interface type="execute" endpoint="localhost:10300" version="1.1.1" />
+
+ <interface type="workflow" endpoint="http://localhost:11010/oozie/" version="3.3.0" />
+
+ <interface type="registry" endpoint="thrift://localhost:19083" version="0.11.0" />
+
+ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3" />
+ </interfaces>
+
+ <locations>
+ <location name="staging" path="/apps/falcon/staging" />
+ <location name="temp" path="/tmp" />
+ <location name="working" path="/apps/falcon/working" />
+ </locations>
+
+</cluster>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/feed.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/feed.xml b/addons/falcon-bridge/src/test/resources/feed.xml
new file mode 100644
index 0000000..473c745
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/feed.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1">
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(3)"/>
+
+ <clusters>
+ <cluster name="testcluster" type="source">
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
+ <retention limit="hours(24)" action="delete"/>
+ <table uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+ </cluster>
+ </clusters>
+
+ <table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="hcat" provider="hcat"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/hive-site.xml b/addons/falcon-bridge/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..b106903
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/hive-site.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>hive.exec.post.hooks</name>
+ <value>org.apache.atlas.hive.hook.HiveHook</value>
+ </property>
+
+ <property>
+ <name>hive.support.concurrency</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>${user.dir}/target/metastore</value>
+ </property>
+
+ <property>
+ <name>javax.jdo.option.ConnectionURL</name>
+ <value>jdbc:derby:${user.dir}/target/metastore_db;create=true</value>
+ </property>
+
+ <property>
+ <name>atlas.hook.hive.synchronous</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>atlas.cluster.name</name>
+ <value>test</value>
+ </property>
+
+ <property>
+ <name>fs.pfile.impl</name>
+ <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/process.xml
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/process.xml b/addons/falcon-bridge/src/test/resources/process.xml
new file mode 100644
index 0000000..b94d0a8
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/process.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="testprocess" xmlns="uri:falcon:process:0.1">
+ <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+
+ <clusters>
+ <cluster name="testcluster">
+ <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <parallel>1</parallel>
+ <order>FIFO</order>
+ <frequency>days(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <inputs>
+ <input end="today(0,0)" start="today(0,0)" feed="testinput" name="input"/>
+ </inputs>
+
+ <outputs>
+ <output instance="now(0,0)" feed="testoutput" name="output"/>
+ </outputs>
+
+ <properties>
+ <property name="blah" value="blah"/>
+ </properties>
+
+ <workflow engine="hive" path="/falcon/test/apps/hive/script.hql"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+ <late-process policy="exp-backoff" delay="hours(2)">
+ <late-input input="input" workflow-path="/falcon/test/workflow"/>
+ </late-process>
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/falcon-bridge/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/resources/startup.properties b/addons/falcon-bridge/src/test/resources/startup.properties
new file mode 100644
index 0000000..2d0dba1
--- /dev/null
+++ b/addons/falcon-bridge/src/test/resources/startup.properties
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+*.config.store.persist=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index ee5ae10..40babe5 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -18,6 +18,7 @@
package org.apache.atlas.hive.bridge;
+import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
@@ -487,9 +488,11 @@ public class HiveMetaStoreBridge {
dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered!");
} catch(AtlasServiceException ase) {
- //Expected in case types do not exist
- LOG.info("Registering Hive data model");
- dgiClient.createType(dataModelGenerator.getModelAsJson());
+ if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ //Expected in case types do not exist
+ LOG.info("Registering Hive data model");
+ dgiClient.createType(dataModelGenerator.getModelAsJson());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/086b4a3e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
index 994c813..1eb2acf 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
@@ -227,7 +227,7 @@ public class HiveDataModelGenerator {
null),
new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
- new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
+ new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,