You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/08/02 20:18:38 UTC
incubator-atlas git commit: ATLAS-965 Old lineage still exists after
dropping tables and re-creating tables with same name. (shwethags via
sumasai)
Repository: incubator-atlas
Updated Branches:
refs/heads/master 8a3c167ab -> 838a0d459
ATLAS-965 Old lineage still exists after dropping tables and re-creating tables with same name. (shwethags via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/838a0d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/838a0d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/838a0d45
Branch: refs/heads/master
Commit: 838a0d4594385b2106867ee7fcd31982c448cd0b
Parents: 8a3c167
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Tue Aug 2 13:18:27 2016 -0700
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Tue Aug 2 13:18:27 2016 -0700
----------------------------------------------------------------------
.../atlas/hive/bridge/HiveMetaStoreBridge.java | 112 +++++----
.../org/apache/atlas/hive/hook/HiveHook.java | 35 +--
.../java/org/apache/atlas/hive/HiveITBase.java | 236 +++++++++++++++++++
.../hive/bridge/HiveMetastoreBridgeIT.java | 83 +++++++
.../org/apache/atlas/hive/hook/HiveHookIT.java | 229 +++---------------
docs/pom.xml | 3 +
pom.xml | 2 +-
release-log.txt | 1 +
.../atlas/web/listeners/GuiceServletConfig.java | 5 +-
9 files changed, 444 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index e0d8024..8d24a67 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -18,6 +18,7 @@
package org.apache.atlas.hive.bridge;
+import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
@@ -25,6 +26,7 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.fs.model.FSDataModel;
import org.apache.atlas.fs.model.FSDataTypes;
+import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
@@ -272,60 +274,68 @@ public class HiveMetaStoreBridge {
List<String> hiveTables = hiveClient.getAllTables(databaseName);
LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
for (String tableName : hiveTables) {
- try {
- Table table = hiveClient.getTable(databaseName, tableName);
- Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
- tablesImported++;
- if (table.getTableType() == TableType.EXTERNAL_TABLE) {
- String tableQualifiedName = getTableQualifiedName(clusterName, table);
- Referenceable process = getProcessReference(tableQualifiedName);
- if (process == null) {
- LOG.info("Attempting to register create table process for {}", tableQualifiedName);
- Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
- ArrayList<Referenceable> sourceList = new ArrayList<>();
- ArrayList<Referenceable> targetList = new ArrayList<>();
- String tableLocation = table.getDataLocation().toString();
- Referenceable path = fillHDFSDataSet(tableLocation);
- String query = getCreateTableString(table, tableLocation);
- sourceList.add(path);
- targetList.add(tableReferenceable);
- lineageProcess.set("inputs", sourceList);
- lineageProcess.set("outputs", targetList);
- lineageProcess.set("userName", table.getOwner());
- lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
- lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
- lineageProcess.set("operationType", "CREATETABLE");
- lineageProcess.set("queryText", query);
- lineageProcess.set("queryId", query);
- lineageProcess.set("queryPlan", "{}");
- lineageProcess.set("clusterName", clusterName);
- List<String> recentQueries = new ArrayList<>(1);
- recentQueries.add(query);
- lineageProcess.set("recentQueries", recentQueries);
- lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
- lineageProcess.set(AtlasClient.NAME, query);
- registerInstance(lineageProcess);
- } else {
- LOG.info("Process {} is already registered", process.toString());
- }
- }
- } catch (Exception e) {
- LOG.error("Import failed for hive_table {} ", tableName, e);
- if (failOnError) {
- throw e;
- }
- }
+ int imported = importTable(databaseReferenceable, databaseName, tableName, failOnError);
+ tablesImported += imported;
}
- if ( tablesImported == hiveTables.size()) {
+ if (tablesImported == hiveTables.size()) {
LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
} else {
- LOG.error("Unable to import {} tables out of {} tables from {}", tablesImported, hiveTables.size(), databaseName);
+ LOG.error("Able to import {} tables out of {} tables from {}. Please check logs for import errors", tablesImported, hiveTables.size(), databaseName);
}
return tablesImported;
}
+ @VisibleForTesting
+ public int importTable(Referenceable databaseReferenceable, String databaseName, String tableName, final boolean failOnError) throws Exception {
+ try {
+ Table table = hiveClient.getTable(databaseName, tableName);
+ Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
+ if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+ String tableQualifiedName = getTableQualifiedName(clusterName, table);
+ Referenceable process = getProcessReference(tableQualifiedName);
+ if (process == null) {
+ LOG.info("Attempting to register create table process for {}", tableQualifiedName);
+ Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
+ ArrayList<Referenceable> sourceList = new ArrayList<>();
+ ArrayList<Referenceable> targetList = new ArrayList<>();
+ String tableLocation = table.getDataLocation().toString();
+ Referenceable path = fillHDFSDataSet(tableLocation);
+ String query = getCreateTableString(table, tableLocation);
+ sourceList.add(path);
+ targetList.add(tableReferenceable);
+ lineageProcess.set("inputs", sourceList);
+ lineageProcess.set("outputs", targetList);
+ lineageProcess.set("userName", table.getOwner());
+ lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
+ lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
+ lineageProcess.set("operationType", "CREATETABLE");
+ lineageProcess.set("queryText", query);
+ lineageProcess.set("queryId", query);
+ lineageProcess.set("queryPlan", "{}");
+ lineageProcess.set("clusterName", clusterName);
+ List<String> recentQueries = new ArrayList<>(1);
+ recentQueries.add(query);
+ lineageProcess.set("recentQueries", recentQueries);
+ String processQualifiedName = getTableProcessQualifiedName(clusterName, table);
+ lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName);
+ lineageProcess.set(AtlasClient.NAME, query);
+ registerInstance(lineageProcess);
+ } else {
+ LOG.info("Process {} is already registered", process.toString());
+ }
+ }
+ return 1;
+ } catch (Exception e) {
+ LOG.error("Import failed for hive_table {} ", tableName, e);
+ if (failOnError) {
+ throw e;
+ }
+ return 0;
+ }
+ }
+
/**
* Gets reference for the table
*
@@ -389,6 +399,12 @@ public class HiveMetaStoreBridge {
return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
}
+ public static String getTableProcessQualifiedName(String clusterName, Table table) {
+ String tableQualifiedName = getTableQualifiedName(clusterName, table);
+ Date createdTime = getTableCreatedTime(table);
+ return tableQualifiedName + HiveHook.SEP + createdTime.getTime();
+ }
+
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param clusterName Name of the cluster to which the Hive component belongs
@@ -412,6 +428,10 @@ public class HiveMetaStoreBridge {
return createOrUpdateTableInstance(dbReference, null, hiveTable);
}
+ private static Date getTableCreatedTime(Table table) {
+ return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
+ }
+
private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
final Table hiveTable) throws Exception {
LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
@@ -428,7 +448,7 @@ public class HiveMetaStoreBridge {
Date createDate = new Date();
if (hiveTable.getTTable() != null){
try {
- createDate = new Date(hiveTable.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
+ createDate = getTableCreatedTime(hiveTable);
LOG.debug("Setting create time to {} ", createDate);
tableReference.set(HiveDataModelGenerator.CREATE_TIME, createDate);
} catch(Exception ne) {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index e27e52c..40e8c5f 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,7 +21,6 @@ package org.apache.atlas.hive.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import kafka.security.auth.Write;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
@@ -90,7 +89,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
- static final String SEP = ":".intern();
+ public static final String SEP = ":".intern();
static final String IO_SEP = "->".intern();
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
@@ -216,7 +215,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
break;
case CREATETABLE_AS_SELECT:
-
case CREATEVIEW:
case ALTERVIEW_AS:
case LOAD:
@@ -244,9 +242,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case ALTERTABLE_PARTCOLTYPE:
handleEventOutputs(dgiBridge, event, Type.TABLE);
break;
+
case ALTERTABLE_RENAMECOL:
renameColumn(dgiBridge, event);
break;
+
case ALTERTABLE_LOCATION:
LinkedHashMap<Type, Referenceable> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
if (tablesUpdated != null && tablesUpdated.size() > 0) {
@@ -547,7 +547,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return null;
}
- private Entity getEntityByType(Set<? extends Entity> entities, Type entityType) {
+ private static Entity getEntityByType(Set<? extends Entity> entities, Type entityType) {
for (Entity entity : entities) {
if (entity.getType() == entityType) {
return entity;
@@ -708,20 +708,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
sortedIps, sortedOps, hiveInputsMap, hiveOutputsMap);
- String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
- if (isCreateOp(event)){
- LOG.info("Overriding process qualified name to {}", tableQualifiedName);
- processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
- }
entities.addAll(tables.values());
entities.add(processReferenceable);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
}
-
}
- private boolean isCreateOp(HiveEventContext hiveEvent) {
+ private static boolean isCreateOp(HiveEventContext hiveEvent) {
if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
|| HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
@@ -733,11 +727,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent,
- final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target) {
+ final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target)
+ throws HiveException {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
String queryStr = lower(hiveEvent.getQueryStr());
- processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target));
+ processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ getProcessQualifiedName(dgiBridge, hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target));
LOG.debug("Registering query: {}", queryStr);
List<Referenceable> sourceList = new ArrayList<>(source.values());
@@ -770,8 +766,19 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
@VisibleForTesting
- static String getProcessQualifiedName(HiveEventContext eventContext, final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> hiveInputsMap, SortedMap<WriteEntity, Referenceable> hiveOutputsMap) {
+ static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext eventContext,
+ final SortedSet<ReadEntity> sortedHiveInputs,
+ final SortedSet<WriteEntity> sortedHiveOutputs,
+ SortedMap<ReadEntity, Referenceable> hiveInputsMap,
+ SortedMap<WriteEntity, Referenceable> hiveOutputsMap) throws HiveException {
HiveOperation op = eventContext.getOperation();
+ if (isCreateOp(eventContext)) {
+ Table outTable = getEntityByType(sortedHiveOutputs, Type.TABLE).getTable();
+ //refresh table
+ outTable = dgiBridge.hiveClient.getTable(outTable.getDbName(), outTable.getTableName());
+ return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable);
+ }
+
StringBuilder buffer = new StringBuilder(op.getOperationName());
boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
new file mode 100644
index 0000000..c90fec5
--- /dev/null
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/HiveITBase.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.fs.model.FSDataTypes;
+import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.hook.HiveHookIT;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+
+import java.io.File;
+import java.util.List;
+
+import static org.apache.atlas.AtlasClient.NAME;
+import static org.apache.atlas.hive.hook.HiveHook.lower;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+
+public class HiveITBase {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveITBase.class);
+
+ protected static final String DGI_URL = "http://localhost:21000/";
+ protected static final String CLUSTER_NAME = "primary";
+ public static final String DEFAULT_DB = "default";
+
+ protected static final String PART_FILE = "2015-01-01";
+ protected Driver driver;
+ protected AtlasClient atlasClient;
+ protected HiveMetaStoreBridge hiveMetaStoreBridge;
+ protected SessionState ss;
+
+ protected HiveConf conf;
+
+ protected static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
+ protected static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
+ protected Driver driverWithoutContext;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ //Set-up hive session
+ conf = new HiveConf();
+ conf.setClassLoader(Thread.currentThread().getContextClassLoader());
+ driver = new Driver(conf);
+ ss = new SessionState(conf);
+ ss = SessionState.start(ss);
+
+ SessionState.setCurrentSessionState(ss);
+
+ Configuration configuration = ApplicationProperties.get();
+ atlasClient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
+
+ hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient);
+ hiveMetaStoreBridge.registerHiveDataModel();
+
+ HiveConf conf = new HiveConf();
+ conf.set("hive.exec.post.hooks", "");
+ SessionState ss = new SessionState(conf);
+ ss = SessionState.start(ss);
+ SessionState.setCurrentSessionState(ss);
+ driverWithoutContext = new Driver(conf);
+ }
+
+ protected void runCommand(String cmd) throws Exception {
+ runCommandWithDelay(cmd, 0);
+ }
+
+ protected void runCommand(Driver driver, String cmd) throws Exception {
+ runCommandWithDelay(driver, cmd, 0);
+ }
+
+ protected void runCommandWithDelay(String cmd, int sleepMs) throws Exception {
+ runCommandWithDelay(driver, cmd, sleepMs);
+ }
+
+ protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
+ LOG.debug("Running command '{}'", cmd);
+ ss.setCommandType(null);
+ CommandProcessorResponse response = driver.run(cmd);
+ assertEquals(response.getResponseCode(), 0);
+ if (sleepMs != 0) {
+ Thread.sleep(sleepMs);
+ }
+ }
+
+ protected String createTestDFSPath(String path) throws Exception {
+ return "pfile://" + mkdir(path);
+ }
+
+ protected String mkdir(String tag) throws Exception {
+ String filename = "./target/" + tag + "-data-" + random();
+ File file = new File(filename);
+ file.mkdirs();
+ return file.getAbsolutePath();
+ }
+
+ protected String random() {
+ return RandomStringUtils.randomAlphanumeric(10);
+ }
+
+ protected String tableName() {
+ return "table" + random();
+ }
+
+ protected String assertTableIsRegistered(String dbName, String tableName) throws Exception {
+ return assertTableIsRegistered(dbName, tableName, null, false);
+ }
+
+ protected String assertTableIsRegistered(String dbName, String tableName, HiveHookIT.AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
+ LOG.debug("Searching for table {}.{}", dbName, tableName);
+ String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporary);
+ return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
+ assertPredicate);
+ }
+
+ protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
+ final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
+ waitFor(80000, new HiveHookIT.Predicate() {
+ @Override
+ public void evaluate() throws Exception {
+ Referenceable entity = atlasClient.getEntity(typeName, property, value);
+ assertNotNull(entity);
+ if (assertPredicate != null) {
+ assertPredicate.assertOnEntity(entity);
+ }
+ }
+ });
+ Referenceable entity = atlasClient.getEntity(typeName, property, value);
+ return entity.getId()._getId();
+ }
+
+ public interface AssertPredicate {
+ void assertOnEntity(Referenceable entity) throws Exception;
+ }
+
+ public interface Predicate {
+ /**
+ * Perform a predicate evaluation.
+ *
+ * @return the boolean result of the evaluation.
+ * @throws Exception thrown if the predicate evaluation could not evaluate.
+ */
+ void evaluate() throws Exception;
+ }
+
+ /**
+ * Wait for a condition, expressed via a {@link Predicate} to become true.
+ *
+ * @param timeout maximum time in milliseconds to wait for the predicate to become true.
+ * @param predicate predicate waiting on.
+ */
+ protected void waitFor(int timeout, Predicate predicate) throws Exception {
+ ParamChecker.notNull(predicate, "predicate");
+ long mustEnd = System.currentTimeMillis() + timeout;
+
+ while (true) {
+ try {
+ predicate.evaluate();
+ return;
+ } catch(Error | Exception e) {
+ if (System.currentTimeMillis() >= mustEnd) {
+ fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
+ }
+ LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e);
+ Thread.sleep(5000);
+ }
+ }
+ }
+
+ protected String getTableProcessQualifiedName(String dbName, String tableName) throws Exception {
+ return HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME,
+ hiveMetaStoreBridge.hiveClient.getTable(dbName, tableName));
+ }
+
+ protected void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception {
+ List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
+
+ for (int i = 0; i < testPaths.length; i++) {
+ final String testPathNormed = lower(new Path(testPaths[i]).toString());
+ String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
+ Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
+
+ Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
+ Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
+ Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName());
+ Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
+ }
+ }
+
+ private String assertHDFSPathIsRegistered(String path) throws Exception {
+ LOG.debug("Searching for hdfs path {}", path);
+ return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null);
+ }
+
+ protected String assertDatabaseIsRegistered(String dbName) throws Exception {
+ return assertDatabaseIsRegistered(dbName, null);
+ }
+
+ protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
+ LOG.debug("Searching for database {}", dbName);
+ String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
+ return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ dbQualifiedName, assertPredicate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
new file mode 100644
index 0000000..7e658a7
--- /dev/null
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetastoreBridgeIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.hive.HiveITBase;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+public class HiveMetastoreBridgeIT extends HiveITBase {
+
+ @Test
+ public void testCreateTableAndImport() throws Exception {
+ String tableName = tableName();
+
+ String pFile = createTestDFSPath("parentPath");
+ final String query = String.format("create EXTERNAL table %s(id string, cnt int) location '%s'", tableName, pFile);
+ runCommand(query);
+ String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
+ String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+ //verify lineage is created
+ String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
+ Referenceable processReference = atlasClient.getEntity(processId);
+ validateHDFSPaths(processReference, INPUTS, pFile);
+
+ List<Id> outputs = (List<Id>) processReference.get(OUTPUTS);
+ assertEquals(outputs.size(), 1);
+ assertEquals(outputs.get(0).getId()._getId(), tableId);
+
+ //Now import using import tool - should be no-op
+ hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true);
+ String tableId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
+ assertEquals(tableId2, tableId);
+
+ String processId2 = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
+ assertEquals(processId2, processId);
+ }
+
+ @Test
+ public void testImportCreatedTable() throws Exception {
+ String tableName = tableName();
+ String pFile = createTestDFSPath("parentPath");
+ runCommand(driverWithoutContext, String.format("create EXTERNAL table %s(id string) location '%s'", tableName, pFile));
+ String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
+
+ hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true);
+ String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+ String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
+ List<Id> outputs = (List<Id>) atlasClient.getEntity(processId).get(OUTPUTS);
+ assertEquals(outputs.size(), 1);
+ assertEquals(outputs.get(0).getId()._getId(), tableId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 573f7f5..e61e916 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -21,10 +21,10 @@ package org.apache.atlas.hive.hook;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.ClientResponse;
-import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.fs.model.FSDataTypes;
+import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
@@ -32,29 +32,24 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.utils.ParamChecker;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
@@ -73,55 +68,20 @@ import java.util.TreeMap;
import java.util.TreeSet;
import static org.apache.atlas.AtlasClient.NAME;
+import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
+import static org.apache.atlas.hive.hook.HiveHook.SEP;
import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
import static org.apache.atlas.hive.hook.HiveHook.lower;
-import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
-import static org.apache.atlas.hive.hook.HiveHook.SEP;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-public class HiveHookIT {
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class);
+public class HiveHookIT extends HiveITBase {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class);
- private static final String DGI_URL = "http://localhost:21000/";
- private static final String CLUSTER_NAME = "primary";
- public static final String DEFAULT_DB = "default";
-
private static final String PART_FILE = "2015-01-01";
- private Driver driver;
- private AtlasClient atlasClient;
- private HiveMetaStoreBridge hiveMetaStoreBridge;
- private SessionState ss;
-
- private HiveConf conf;
-
- private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
- private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
-
- @BeforeClass
- public void setUp() throws Exception {
- //Set-up hive session
- conf = new HiveConf();
- conf.setClassLoader(Thread.currentThread().getContextClassLoader());
- driver = new Driver(conf);
- ss = new SessionState(conf);
- ss = SessionState.start(ss);
-
- SessionState.setCurrentSessionState(ss);
-
- Configuration configuration = ApplicationProperties.get();
- atlasClient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
-
- hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient);
- hiveMetaStoreBridge.registerHiveDataModel();
- }
-
- private void runCommand(String cmd) throws Exception {
- runCommandWithDelay(cmd, 0);
- }
@Test
public void testCreateDatabase() throws Exception {
@@ -158,10 +118,6 @@ public class HiveHookIT {
return dbName;
}
- private String tableName() {
- return "table" + random();
- }
-
private String columnName() {
return "col" + random();
}
@@ -260,8 +216,9 @@ public class HiveHookIT {
final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
- String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, true), null);
+ String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+ getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
Referenceable processReference = atlasClient.getEntity(processId);
assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
@@ -271,7 +228,7 @@ public class HiveHookIT {
validateHDFSPaths(processReference, INPUTS, pFile);
}
- private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) {
+ private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) throws HiveException {
final ReadEntity entity = new ReadEntity();
if ( Entity.Type.DFS_DIR.equals(entityType)) {
@@ -282,10 +239,14 @@ public class HiveHookIT {
entity.setTyp(entityType);
}
+ if (entityType == Entity.Type.TABLE) {
+ entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName));
+ }
+
return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
}
- private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) {
+ private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) throws HiveException {
final WriteEntity entity = new WriteEntity();
if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
@@ -296,6 +257,9 @@ public class HiveHookIT {
entity.setTyp(entityType);
}
+ if (entityType == Entity.Type.TABLE) {
+ entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName));
+ }
return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
}
@@ -373,7 +337,6 @@ public class HiveHookIT {
String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
-
}
@Test
@@ -392,19 +355,17 @@ public class HiveHookIT {
String processId = assertProcessIsRegistered(hiveEventContext);
final String drpquery = String.format("drop table %s ", ctasTableName);
- runCommand(drpquery);
+ runCommandWithDelay(drpquery, 100);
assertTableIsNotRegistered(DEFAULT_DB, ctasTableName);
- //TODO : Fix after ATLAS-876
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
+ outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs);
- Assert.assertEquals(process2Id, processId);
+ assertNotEquals(process2Id, processId);
Referenceable processRef = atlasClient.getEntity(processId);
-
- outputs.add(outputs.iterator().next());
validateOutputTables(processRef, outputs);
}
@@ -421,7 +382,6 @@ public class HiveHookIT {
@Test
public void testAlterViewAsSelect() throws Exception {
-
//Create the view from table1
String table1Name = createTable();
String viewName = tableName();
@@ -466,10 +426,6 @@ public class HiveHookIT {
Assert.assertEquals(vertices.length(), 0);
}
- private String createTestDFSPath(String path) throws Exception {
- return "pfile://" + mkdir(path);
- }
-
private String createTestDFSFile(String path) throws Exception {
return "pfile://" + file(path);
}
@@ -763,10 +719,6 @@ public class HiveHookIT {
//TODO -Add update test case
}
- private String random() {
- return RandomStringUtils.randomAlphanumeric(10);
- }
-
private String file(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
@@ -774,13 +726,6 @@ public class HiveHookIT {
return file.getAbsolutePath();
}
- private String mkdir(String tag) throws Exception {
- String filename = "./target/" + tag + "-data-" + random();
- File file = new File(filename);
- file.mkdirs();
- return file.getAbsolutePath();
- }
-
@Test
public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false);
@@ -1159,16 +1104,6 @@ public class HiveHookIT {
);
}
- private void runCommandWithDelay(String cmd, int sleepMs) throws CommandNeedRetryException, InterruptedException {
- LOG.debug("Running command '{}'", cmd);
- ss.setCommandType(null);
- CommandProcessorResponse response = driver.run(cmd);
- assertEquals(response.getResponseCode(), 0);
- if (sleepMs != 0) {
- Thread.sleep(sleepMs);
- }
- }
-
@Test
public void testTruncateTable() throws Exception {
String tableName = createTable(false);
@@ -1217,15 +1152,9 @@ public class HiveHookIT {
@Test
public void testAlterTableWithoutHookConf() throws Exception {
- HiveConf conf = new HiveConf();
- conf.set("hive.exec.post.hooks", "");
- SessionState ss = new SessionState(conf);
- ss = SessionState.start(ss);
- SessionState.setCurrentSessionState(ss);
- Driver driver = new Driver(conf);
String tableName = tableName();
String createCommand = "create table " + tableName + " (id int, name string)";
- driver.run(createCommand);
+ driverWithoutContext.run(createCommand);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
String command = "alter table " + tableName + " change id id_new string";
runCommand(command);
@@ -1285,34 +1214,15 @@ public class HiveHookIT {
}
});
- String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, false), null);
+ String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
+ String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
+ AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName, null);
Referenceable processReference = atlasClient.getEntity(processId);
validateHDFSPaths(processReference, INPUTS, testPath);
}
- private void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception {
- List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
-
- for (int i = 0; i < testPaths.length; i++) {
- final String testPathNormed = lower(new Path(testPaths[i]).toString());
- String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
- Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
-
- Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
- Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
- Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName());
- Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
- }
- }
-
- private String assertHDFSPathIsRegistered(String path) throws Exception {
- LOG.debug("Searching for hdfs path {}", path);
- return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null);
- }
-
@Test
public void testAlterTableFileFormat() throws Exception {
String tableName = createTable();
@@ -1614,7 +1524,7 @@ public class HiveHookIT {
}};
String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
- runCommand(query);
+ runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, false);
expectedProps.put("testPropKey2", "testPropValue2");
@@ -1710,7 +1620,7 @@ public class HiveHookIT {
sortedHiveOutputs.addAll(event.getOutputs());
}
- String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
+ String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
@@ -1735,7 +1645,7 @@ public class HiveHookIT {
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
- String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
+ String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
@@ -1777,7 +1687,7 @@ public class HiveHookIT {
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
- String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
+ String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
} catch( Exception e) {
@@ -1803,49 +1713,10 @@ public class HiveHookIT {
assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName);
}
- private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
- return assertTableIsRegistered(dbName, tableName, null, false);
- }
-
-
- private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
- LOG.debug("Searching for table {}.{}", dbName, tableName);
- String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporary);
- return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
- assertPredicate);
- }
-
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception {
return assertTableIsRegistered(dbName, tableName, assertPredicate, false);
}
- private String assertDatabaseIsRegistered(String dbName) throws Exception {
- return assertDatabaseIsRegistered(dbName, null);
- }
-
- private String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
- LOG.debug("Searching for database {}", dbName);
- String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
- return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- dbQualifiedName, assertPredicate);
- }
-
- private String assertEntityIsRegistered(final String typeName, final String property, final String value,
- final AssertPredicate assertPredicate) throws Exception {
- waitFor(80000, new Predicate() {
- @Override
- public void evaluate() throws Exception {
- Referenceable entity = atlasClient.getEntity(typeName, property, value);
- assertNotNull(entity);
- if (assertPredicate != null) {
- assertPredicate.assertOnEntity(entity);
- }
- }
- });
- Referenceable entity = atlasClient.getEntity(typeName, property, value);
- return entity.getId()._getId();
- }
-
private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
@@ -1894,42 +1765,4 @@ public class HiveHookIT {
runCommand("show compactions");
runCommand("show transactions");
}
-
- public interface AssertPredicate {
- void assertOnEntity(Referenceable entity) throws Exception;
- }
-
- public interface Predicate {
- /**
- * Perform a predicate evaluation.
- *
- * @return the boolean result of the evaluation.
- * @throws Exception thrown if the predicate evaluation could not evaluate.
- */
- void evaluate() throws Exception;
- }
-
- /**
- * Wait for a condition, expressed via a {@link Predicate} to become true.
- *
- * @param timeout maximum time in milliseconds to wait for the predicate to become true.
- * @param predicate predicate waiting on.
- */
- protected void waitFor(int timeout, Predicate predicate) throws Exception {
- ParamChecker.notNull(predicate, "predicate");
- long mustEnd = System.currentTimeMillis() + timeout;
-
- while (true) {
- try {
- predicate.evaluate();
- return;
- } catch(Error | Exception e) {
- if (System.currentTimeMillis() >= mustEnd) {
- fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
- }
- LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e);
- Thread.sleep(5000);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/docs/pom.xml
----------------------------------------------------------------------
diff --git a/docs/pom.xml b/docs/pom.xml
index 9b8440d..b0a5777 100755
--- a/docs/pom.xml
+++ b/docs/pom.xml
@@ -45,6 +45,9 @@
<skip>${skipDocs}</skip>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ <webAccessUrl>https://github.com/apache/incubator-atlas.git</webAccessUrl>
+ <anonymousConnection>scm:git://git.apache.org/incubator-atlas.git</anonymousConnection>
+ <developerConnection>scm:https://git-wip-us.apache.org/repos/asf/incubator-atlas.git</developerConnection>
</configuration>
<reportSets>
<reportSet>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4855d3d..506c68e 100755
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,7 @@
</mailingLists>
<scm>
- <connection>scm:git:https://github.com/apache/incubator-atlas.git</connection>
+ <connection>scm:git:git://git.apache.org/incubator-atlas.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-atlas.git</developerConnection>
<tag>HEAD</tag>
<url>https://github.com/apache/incubator-atlas.git</url>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index c16c62a..7b19fb4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES:
+ATLAS-965 Old lineage still exists after dropping tables and re-creating tables with same name. (shwethags via sumasai)
ATLAS-1048 TestMetadata.py test in distro project fails on Windows (jnhagelb via shwethags)
ATLAS-1026 StoreBackedTypeCache issues (dkantor via shwethags)
ATLAS-861 1 table out of 50,000 tables is left unimported throwing exception during deserialization (sumasai via shwethags)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/838a0d45/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index 1aee467..0a7c5df 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -23,6 +23,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
+import com.google.inject.Stage;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceServletContextListener;
import com.sun.jersey.api.core.PackagesResourceConfig;
@@ -39,12 +40,10 @@ import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
import org.apache.atlas.web.filters.ActiveServerFilter;
-import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ActiveInstanceElectorModule;
import org.apache.atlas.web.service.ServiceModule;
import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
@@ -75,7 +74,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
- injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(),
+ injector = Guice.createInjector(Stage.PRODUCTION, getRepositoryModule(), new ActiveInstanceElectorModule(),
new NotificationModule(), new ServiceModule(), new JerseyServletModule() {
private Configuration appConfiguration = null;