You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by su...@apache.org on 2016/05/13 16:41:23 UTC
incubator-atlas git commit: ATLAS-619 Canonicalize hive queries
(sumasai)
Repository: incubator-atlas
Updated Branches:
refs/heads/master b6a0eee7f -> 9e1f36637
ATLAS-619 Canonicalize hive queries (sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9e1f3663
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9e1f3663
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9e1f3663
Branch: refs/heads/master
Commit: 9e1f366374827d5471a84c9ece438e89e814b7f8
Parents: b6a0eee
Author: Suma Shivaprasad <su...@gmail.com>
Authored: Fri May 13 09:41:14 2016 -0700
Committer: Suma Shivaprasad <su...@gmail.com>
Committed: Fri May 13 09:41:14 2016 -0700
----------------------------------------------------------------------
.../apache/atlas/falcon/hook/FalconHook.java | 6 +-
.../falcon/model/FalconDataModelGenerator.java | 3 -
.../apache/atlas/falcon/hook/FalconHookIT.java | 8 +-
addons/hive-bridge/pom.xml | 7 +
.../org/apache/atlas/hive/hook/HiveHook.java | 294 +++++++++++--------
.../hive/model/HiveDataModelGenerator.java | 2 +
.../apache/atlas/hive/rewrite/ASTRewriter.java | 26 ++
.../atlas/hive/rewrite/HiveASTRewriter.java | 95 ++++++
.../atlas/hive/rewrite/LiteralRewriter.java | 76 +++++
.../atlas/hive/rewrite/RewriteContext.java | 48 +++
.../atlas/hive/rewrite/RewriteException.java | 26 ++
.../hive/bridge/HiveLiteralRewriterTest.java | 67 +++++
.../org/apache/atlas/hive/hook/HiveHookIT.java | 219 +++++++++-----
.../src/test/resources/hive-site.xml | 10 +
.../org/apache/atlas/sqoop/hook/SqoopHook.java | 4 +-
.../apache/atlas/sqoop/hook/SqoopHookIT.java | 4 +-
.../apache/atlas/storm/hook/StormAtlasHook.java | 3 +-
.../atlas/storm/model/StormDataModel.scala | 3 +-
release-log.txt | 1 +
.../atlas/services/DefaultMetadataService.java | 19 +-
.../apache/atlas/BaseHiveRepositoryTest.java | 3 +-
.../org/apache/atlas/examples/QuickStart.java | 3 +-
.../org/apache/atlas/examples/QuickStartIT.java | 12 +-
.../resources/HiveLineageJerseyResourceIT.java | 6 +-
24 files changed, 719 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
index 8fced05..97ee1a2 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java
@@ -211,10 +211,10 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
if (!inputs.isEmpty() || !outputs.isEmpty()) {
Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
- processEntity.set(FalconDataModelGenerator.NAME, String.format("%s@%s", process.getName(),
+ processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(),
cluster.getName()));
- processEntity.set(FalconDataModelGenerator.PROCESS_NAME, process.getName());
-
+ processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(),
+ cluster.getName()));
processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
if (!inputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
index 2494675..397dea4 100644
--- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
+++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java
@@ -57,7 +57,6 @@ public class FalconDataModelGenerator {
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
public static final String NAME = "name";
- public static final String PROCESS_NAME = "processName";
public static final String TIMESTAMP = "timestamp";
public static final String USER = "owned-by";
public static final String TAGS = "tag-classification";
@@ -107,8 +106,6 @@ public class FalconDataModelGenerator {
private void createProcessEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
- new AttributeDefinition(PROCESS_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
- null),
new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
----------------------------------------------------------------------
diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
index 4e2a06f..9b356a2 100644
--- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
+++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java
@@ -150,7 +150,7 @@ public class FalconHookIT {
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertNotNull(processEntity);
- assertEquals(processEntity.get("processName"), process.getName());
+ assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
Referenceable inEntity = atlasClient.getEntity(inId._getId());
@@ -207,7 +207,7 @@ public class FalconHookIT {
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
- assertEquals(processEntity.get("processName"), process.getName());
+ assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
assertNull(processEntity.get("inputs"));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
@@ -233,8 +233,8 @@ public class FalconHookIT {
private String assertProcessIsRegistered(String clusterName, String processName) throws Exception {
String name = processName + "@" + clusterName;
LOG.debug("Searching for process {}", name);
- String query = String.format("%s as t where name = '%s' select t",
- FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), name);
+ String query = String.format("%s as t where %s = '%s' select t",
+ FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
return assertEntityIsRegistered(query);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml
index eeb2aa4..47e72e8 100755
--- a/addons/hive-bridge/pom.xml
+++ b/addons/hive-bridge/pom.xml
@@ -72,6 +72,13 @@
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 5a1a36e..418e755 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -19,11 +19,14 @@
package org.apache.atlas.hive.hook;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hive.rewrite.HiveASTRewriter;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
@@ -92,110 +95,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
- static class HiveEventContext {
- private Set<ReadEntity> inputs;
- private Set<WriteEntity> outputs;
-
- private String user;
- private UserGroupInformation ugi;
- private HiveOperation operation;
- private HookContext.HookType hookType;
- private org.json.JSONObject jsonPlan;
- private String queryId;
- private String queryStr;
- private Long queryStartTime;
-
- private String queryType;
-
- public void setInputs(Set<ReadEntity> inputs) {
- this.inputs = inputs;
- }
-
- public void setOutputs(Set<WriteEntity> outputs) {
- this.outputs = outputs;
- }
-
- public void setUser(String user) {
- this.user = user;
- }
-
- public void setUgi(UserGroupInformation ugi) {
- this.ugi = ugi;
- }
-
- public void setOperation(HiveOperation operation) {
- this.operation = operation;
- }
-
- public void setHookType(HookContext.HookType hookType) {
- this.hookType = hookType;
- }
-
- public void setJsonPlan(JSONObject jsonPlan) {
- this.jsonPlan = jsonPlan;
- }
-
- public void setQueryId(String queryId) {
- this.queryId = queryId;
- }
-
- public void setQueryStr(String queryStr) {
- this.queryStr = queryStr;
- }
-
- public void setQueryStartTime(Long queryStartTime) {
- this.queryStartTime = queryStartTime;
- }
-
- public void setQueryType(String queryType) {
- this.queryType = queryType;
- }
-
- public Set<ReadEntity> getInputs() {
- return inputs;
- }
-
- public Set<WriteEntity> getOutputs() {
- return outputs;
- }
-
- public String getUser() {
- return user;
- }
-
- public UserGroupInformation getUgi() {
- return ugi;
- }
-
- public HiveOperation getOperation() {
- return operation;
- }
-
- public HookContext.HookType getHookType() {
- return hookType;
- }
-
- public org.json.JSONObject getJsonPlan() {
- return jsonPlan;
- }
-
- public String getQueryId() {
- return queryId;
- }
-
- public String getQueryStr() {
- return queryStr;
- }
-
- public Long getQueryStartTime() {
- return queryStartTime;
- }
-
- public String getQueryType() {
- return queryType;
- }
- }
-
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
private static final HiveConf hiveConf;
@@ -362,7 +261,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private void deleteTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
- for (WriteEntity output : event.outputs) {
+ for (WriteEntity output : event.getOutputs()) {
if (Type.TABLE.equals(output.getType())) {
deleteTable(dgiBridge, event, output);
}
@@ -380,11 +279,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
- if (event.outputs.size() > 1) {
- LOG.info("Starting deletion of tables and databases with cascade {} " , event.queryStr);
+ if (event.getOutputs().size() > 1) {
+ LOG.info("Starting deletion of tables and databases with cascade {} " , event.getQueryStr());
}
- for (WriteEntity output : event.outputs) {
+ for (WriteEntity output : event.getOutputs()) {
if (Type.TABLE.equals(output.getType())) {
deleteTable(dgiBridge, event, output);
} else if (Type.DATABASE.equals(output.getType())) {
@@ -552,13 +451,28 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return entitiesCreatedOrUpdated;
}
- public static String normalize(String str) {
+ public static String lower(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return str.toLowerCase().trim();
}
+ public static String normalize(String queryStr) {
+ String result = null;
+ if (queryStr != null) {
+ try {
+ HiveASTRewriter rewriter = new HiveASTRewriter(hiveConf);
+ result = rewriter.rewrite(queryStr);
+ } catch (Exception e) {
+ LOG.warn("Could not rewrite query due to error. Proceeding with original query {}", queryStr, e);
+ }
+ }
+
+ result = lower(result);
+ return result;
+ }
+
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
Set<ReadEntity> inputs = event.getInputs();
Set<WriteEntity> outputs = event.getOutputs();
@@ -589,7 +503,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
if (source.size() > 0 || target.size() > 0) {
- Referenceable processReferenceable = getProcessReferenceable(event,
+ Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
new ArrayList<Referenceable>() {{
addAll(source.values());
}},
@@ -613,7 +527,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
dataSets.put(tblQFName, inTable);
}
} else if (entity.getType() == Type.DFS_DIR) {
- final String pathUri = normalize(new Path(entity.getLocation()).toString());
+ final String pathUri = lower(new Path(entity.getLocation()).toString());
LOG.info("Registering DFS Path {} ", pathUri);
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSets.put(pathUri, hdfsPath);
@@ -657,7 +571,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Refresh to get the correct location
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
- final String location = normalize(hiveTable.getDataLocation().toString());
+ final String location = lower(hiveTable.getDataLocation().toString());
if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
LOG.info("Registering external table process {} ", event.getQueryStr());
List<Referenceable> inputs = new ArrayList<Referenceable>() {{
@@ -668,15 +582,33 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
add(tblRef);
}};
- Referenceable processReferenceable = getProcessReferenceable(event, inputs, outputs);
+ Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
}
}
- private Referenceable getProcessReferenceable(HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
+ private boolean isCreateOp(HiveEventContext hiveEvent) {
+ if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
+ || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
+ || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
+ || HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) {
+ return true;
+ }
+ return false;
+ }
+
+ private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
- String queryStr = normalize(hiveEvent.getQueryStr());
+ String queryStr = hiveEvent.getQueryStr();
+ if (!isCreateOp(hiveEvent)) {
+ queryStr = normalize(queryStr);
+ processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(queryStr, sourceList, targetList));
+ } else {
+ queryStr = lower(queryStr);
+ processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, queryStr);
+ }
+
LOG.debug("Registering query: {}", queryStr);
//The serialization code expected a list
@@ -686,15 +618,145 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
if (targetList != null || !targetList.isEmpty()) {
processReferenceable.set("outputs", targetList);
}
- processReferenceable.set("name", queryStr);
+ processReferenceable.set(AtlasClient.NAME, queryStr);
+
processReferenceable.set("operationType", hiveEvent.getOperation().getOperationName());
processReferenceable.set("startTime", new Date(hiveEvent.getQueryStartTime()));
processReferenceable.set("userName", hiveEvent.getUser());
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", hiveEvent.getQueryId());
processReferenceable.set("queryPlan", hiveEvent.getJsonPlan());
+ processReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, dgiBridge.getClusterName());
+
+ List<String> recentQueries = new ArrayList<>(1);
+ recentQueries.add(hiveEvent.getQueryStr());
+ processReferenceable.set("recentQueries", recentQueries);
processReferenceable.set("endTime", new Date(System.currentTimeMillis()));
//TODO set queryGraph
return processReferenceable;
}
+
+ @VisibleForTesting
+ static String getProcessQualifiedName(String normalizedQuery, List<Referenceable> inputs, List<Referenceable> outputs) {
+ StringBuilder buffer = new StringBuilder(normalizedQuery);
+ addDatasets(buffer, inputs);
+ addDatasets(buffer, outputs);
+ return buffer.toString();
+ }
+
+ private static void addDatasets(StringBuilder buffer, List<Referenceable> refs) {
+ if (refs != null) {
+ for (Referenceable input : refs) {
+ //TODO - Change to qualifiedName later
+ buffer.append(":");
+ String dataSetQlfdName = (String) input.get(AtlasClient.NAME);
+ buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+ }
+ }
+ }
+
+ public static class HiveEventContext {
+ private Set<ReadEntity> inputs;
+ private Set<WriteEntity> outputs;
+
+ private String user;
+ private UserGroupInformation ugi;
+ private HiveOperation operation;
+ private HookContext.HookType hookType;
+ private JSONObject jsonPlan;
+ private String queryId;
+ private String queryStr;
+ private Long queryStartTime;
+
+ private String queryType;
+
+ public void setInputs(Set<ReadEntity> inputs) {
+ this.inputs = inputs;
+ }
+
+ public void setOutputs(Set<WriteEntity> outputs) {
+ this.outputs = outputs;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setUgi(UserGroupInformation ugi) {
+ this.ugi = ugi;
+ }
+
+ public void setOperation(HiveOperation operation) {
+ this.operation = operation;
+ }
+
+ public void setHookType(HookContext.HookType hookType) {
+ this.hookType = hookType;
+ }
+
+ public void setJsonPlan(JSONObject jsonPlan) {
+ this.jsonPlan = jsonPlan;
+ }
+
+ public void setQueryId(String queryId) {
+ this.queryId = queryId;
+ }
+
+ public void setQueryStr(String queryStr) {
+ this.queryStr = queryStr;
+ }
+
+ public void setQueryStartTime(Long queryStartTime) {
+ this.queryStartTime = queryStartTime;
+ }
+
+ public void setQueryType(String queryType) {
+ this.queryType = queryType;
+ }
+
+ public Set<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public Set<WriteEntity> getOutputs() {
+ return outputs;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public UserGroupInformation getUgi() {
+ return ugi;
+ }
+
+ public HiveOperation getOperation() {
+ return operation;
+ }
+
+ public HookContext.HookType getHookType() {
+ return hookType;
+ }
+
+ public JSONObject getJsonPlan() {
+ return jsonPlan;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getQueryStr() {
+ return queryStr;
+ }
+
+ public Long getQueryStartTime() {
+ return queryStartTime;
+ }
+
+ public String getQueryType() {
+ return queryType;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
index 7cbb1df..347405c 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
@@ -288,6 +288,8 @@ public class HiveDataModelGenerator {
new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition("queryId", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
+ new AttributeDefinition("recentQueries", String.format("array<%s>", DataTypes.STRING_TYPE.getName()), Multiplicity.OPTIONAL, false, null),
+ new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("queryGraph", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),};
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java
new file mode 100644
index 0000000..3a2506b
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/ASTRewriter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.hive.rewrite;
+
+
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+
+public interface ASTRewriter {
+
+ void rewrite(RewriteContext ctx, ASTNode node) throws RewriteException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java
new file mode 100644
index 0000000..4cd219f
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/HiveASTRewriter.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.hive.rewrite;
+
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HiveASTRewriter {
+
+ private Context queryContext;
+ private RewriteContext rwCtx;
+ private List<ASTRewriter> rewriters = new ArrayList<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveASTRewriter.class);
+
+ public HiveASTRewriter(HiveConf conf) throws RewriteException {
+ try {
+ queryContext = new Context(conf);
+ setUpRewriters();
+ } catch (IOException e) {
+ throw new RewriteException("Exception while rewriting query : " , e);
+ }
+ }
+
+ private void setUpRewriters() throws RewriteException {
+ ASTRewriter rewriter = new LiteralRewriter();
+ rewriters.add(rewriter);
+ }
+
+ public String rewrite(String sourceQry) throws RewriteException {
+ String result = sourceQry;
+ ASTNode tree = null;
+ try {
+ ParseDriver pd = new ParseDriver();
+ tree = pd.parse(sourceQry, queryContext, true);
+ tree = ParseUtils.findRootNonNullToken(tree);
+ this.rwCtx = new RewriteContext(sourceQry, tree, queryContext.getTokenRewriteStream());
+ rewrite(tree);
+ result = toSQL();
+ } catch (ParseException e) {
+ LOG.error("Could not parse the query {} ", sourceQry, e);
+ throw new RewriteException("Could not parse query : " , e);
+ }
+ return result;
+ }
+
+ private void rewrite(ASTNode origin) throws RewriteException {
+ ASTNode node = origin;
+ if (node != null) {
+ for(ASTRewriter rewriter : rewriters) {
+ rewriter.rewrite(rwCtx, node);
+ }
+ if (node.getChildren() != null) {
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ rewrite((ASTNode) node.getChild(i));
+ }
+ }
+ }
+ }
+
+ public String toSQL() {
+ return rwCtx.getTokenRewriteStream().toString();
+ }
+
+ public String printAST() {
+ return rwCtx.getOriginNode().dump();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java
new file mode 100644
index 0000000..789b981
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/LiteralRewriter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.hive.rewrite;
+
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LiteralRewriter implements ASTRewriter {
+
+ public static Map<Integer, String> LITERAL_TOKENS = new HashMap<Integer, String>() {{
+ put(HiveParser.Number, "NUMBER_LITERAL");
+ put(HiveParser.Digit, "DIGIT_LITERAL");
+ put(HiveParser.HexDigit, "HEX_LITERAL");
+ put(HiveParser.Exponent, "EXPONENT_LITERAL");
+ put(HiveParser.StringLiteral, "'STRING_LITERAL'");
+ put(HiveParser.BigintLiteral, "BIGINT_LITERAL");
+ put(HiveParser.SmallintLiteral, "SMALLINT_LITERAL");
+ put(HiveParser.TinyintLiteral, "TINYINT_LITERAL");
+ put(HiveParser.DecimalLiteral, "DECIMAL_LITERAL");
+ put(HiveParser.ByteLengthLiteral, "BYTE_LENGTH_LITERAL");
+ put(HiveParser.TOK_STRINGLITERALSEQUENCE, "'STRING_LITERAL_SEQ'");
+ put(HiveParser.TOK_CHARSETLITERAL, "'CHARSET_LITERAL'");
+ put(HiveParser.KW_TRUE, "BOOLEAN_LITERAL");
+ put(HiveParser.KW_FALSE, "BOOLEAN_LITERAL");
+ }};
+
+
+ @Override
+ public void rewrite(RewriteContext ctx, final ASTNode node) throws RewriteException {
+ try {
+ processLiterals(ctx, node);
+ } catch(Exception e) {
+ throw new RewriteException("Could not normalize query", e);
+ }
+ }
+
+
+ private void processLiterals(final RewriteContext ctx, final ASTNode node) {
+ // Take child ident.totext
+ if (isLiteral(node)) {
+ replaceLiteral(ctx, node);
+ }
+ }
+
+ private boolean isLiteral(ASTNode node) {
+ if (LITERAL_TOKENS.containsKey(node.getType())) {
+ return true;
+ }
+ return false;
+ }
+
+ void replaceLiteral(RewriteContext ctx, ASTNode valueNode) {
+ //Reset the token stream
+ String literalVal = LITERAL_TOKENS.get(valueNode.getType());
+ ctx.getTokenRewriteStream().replace(valueNode.getTokenStartIndex(),
+ valueNode.getTokenStopIndex(), literalVal);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java
new file mode 100644
index 0000000..505616e
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteContext.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.hive.rewrite;
+
+import org.antlr.runtime.TokenRewriteStream;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+
+public class RewriteContext {
+
+ private String origQuery;
+
+ private TokenRewriteStream rewriteStream;
+
+ private ASTNode origin;
+
+ RewriteContext(String origQuery, ASTNode origin, TokenRewriteStream rewriteStream) {
+ this.origin = origin;
+ this.rewriteStream = rewriteStream;
+ }
+
+ public TokenRewriteStream getTokenRewriteStream() {
+ return rewriteStream;
+ }
+
+ public ASTNode getOriginNode() {
+ return origin;
+ }
+
+ public String getOriginalQuery() {
+ return origQuery;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java
new file mode 100644
index 0000000..79a1afe
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/rewrite/RewriteException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.hive.rewrite;
+
+import org.apache.hadoop.hive.ql.parse.ParseException;
+
+public class RewriteException extends Exception {
+ public RewriteException(final String message, final Exception exception) {
+ super(message, exception);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
new file mode 100644
index 0000000..2840457
--- /dev/null
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveLiteralRewriterTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.hive.hook.HiveHook;
+import org.apache.atlas.hive.rewrite.HiveASTRewriter;
+import org.apache.atlas.hive.rewrite.RewriteException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class HiveLiteralRewriterTest {
+
+ private HiveConf conf;
+
+ @BeforeClass
+ public void setup() {
+ conf = new HiveConf();
+ conf.addResource("/hive-site.xml");
+ SessionState ss = new SessionState(conf, "testuser");
+ SessionState.start(ss);
+ conf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager");
+ }
+
+ @Test
+ public void testLiteralRewrite() throws RewriteException {
+ HiveHook.HiveEventContext ctx = new HiveHook.HiveEventContext();
+ ctx.setQueryStr("insert into table testTable partition(dt='2014-01-01') select * from test1 where dt = '2014-01-01'" +
+ " and intColumn = 10" +
+ " and decimalColumn = 1.10" +
+ " and charColumn = 'a'" +
+ " and hexColumn = unhex('\\0xFF')" +
+ " and expColumn = cast('-1.5e2' as int)" +
+ " and boolCol = true");
+
+ HiveASTRewriter queryRewriter = new HiveASTRewriter(conf);
+ String result = queryRewriter.rewrite(ctx.getQueryStr());
+ System.out.println("normlized sql : " + result);
+
+ final String normalizedSQL = "insert into table testTable partition(dt='STRING_LITERAL') " +
+ "select * from test1 where dt = 'STRING_LITERAL' " +
+ "and intColumn = NUMBER_LITERAL " +
+ "and decimalColumn = NUMBER_LITERAL and " +
+ "charColumn = 'STRING_LITERAL' and " +
+ "hexColumn = unhex('STRING_LITERAL') and " +
+ "expColumn = cast('STRING_LITERAL' as int) and " +
+ "boolCol = BOOLEAN_LITERAL";
+ Assert.assertEquals(result, normalizedSQL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 43bba0e..70100f1 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -28,6 +28,8 @@ import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.hive.rewrite.HiveASTRewriter;
+import org.apache.atlas.hive.rewrite.RewriteException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
@@ -56,11 +58,13 @@ import org.testng.annotations.Test;
import java.io.File;
import java.text.ParseException;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.normalize;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -76,6 +80,8 @@ public class HiveHookIT {
private AtlasClient atlasClient;
private HiveMetaStoreBridge hiveMetaStoreBridge;
private SessionState ss;
+
+ private HiveConf conf;
private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
@@ -83,10 +89,7 @@ public class HiveHookIT {
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
- HiveConf conf = new HiveConf();
- //Run in local mode
- conf.set("mapreduce.framework.name", "local");
- conf.set("fs.default.name", "file:///'");
+ conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
driver = new Driver(conf);
ss = new SessionState(conf);
@@ -98,7 +101,6 @@ public class HiveHookIT {
hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
-
}
private void runCommand(String cmd) throws Exception {
@@ -231,36 +233,36 @@ public class HiveHookIT {
@Test
public void testCreateExternalTable() throws Exception {
String tableName = tableName();
- String dbName = createDatabase();
String colName = columnName();
String pFile = createTestDFSPath("parentPath");
- final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", dbName , tableName , colName + " int", "name string", pFile);
+ final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query);
- String tableId = assertTableIsRegistered(dbName, tableName, null, true);
+ assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
- Referenceable processReference = validateProcess(query, 1, 1);
+ String processId = assertProcessIsRegistered(query);
+ Referenceable processReference = atlasClient.getEntity(processId);
assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
verifyTimestamps(processReference, "startTime");
verifyTimestamps(processReference, "endTime");
validateHDFSPaths(processReference, pFile, INPUTS);
- validateOutputTables(processReference, tableId);
}
- private void validateOutputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
- validateTables(processReference, OUTPUTS, expectedTableGuids);
+ private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
+ validateTables(processReference, OUTPUTS, expectedTableNames);
}
- private void validateInputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
- validateTables(processReference, INPUTS, expectedTableGuids);
+ private void validateInputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
+ validateTables(processReference, INPUTS, expectedTableNames);
}
- private void validateTables(Referenceable processReference, String attrName, String... expectedTableGuids) throws Exception {
+ private void validateTables(Referenceable processReference, String attrName, String... expectedTableNames) throws Exception {
List<Id> tableRef = (List<Id>) processReference.get(attrName);
- for(int i = 0; i < expectedTableGuids.length; i++) {
- Assert.assertEquals(tableRef.get(i)._getId(), expectedTableGuids[i]);
+ for(int i = 0; i < expectedTableNames.length; i++) {
+ Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
+ Assert.assertEquals(entity.get(AtlasClient.NAME), expectedTableNames[i]);
}
}
@@ -371,7 +373,7 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
runCommand(query);
- assertProcessIsRegistered(query);
+ assertProcessIsRegistered(query, null, getQualifiedTblName(tableName));
}
@Test
@@ -382,7 +384,7 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
- validateProcess(query, 0, 1);
+ validateProcess(query, null, getQualifiedTblName(tableName));
}
@Test
@@ -392,49 +394,42 @@ public class HiveHookIT {
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String loadFile = createTestDFSFile("loadDFSFile");
+ final String testPathNormed = lower(new Path(loadFile).toString());
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
- Referenceable processReference = validateProcess(query, 1, 1);
+ final String tblQlfdName = getQualifiedTblName(tableName);
+ Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, loadFile, INPUTS);
- validateOutputTables(processReference, tableId);
+ validateOutputTables(processReference, tblQlfdName);
}
- private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception {
- String processId = assertProcessIsRegistered(query);
- Referenceable process = atlasClient.getEntity(processId);
- if (numInputs == 0) {
- Assert.assertNull(process.get(INPUTS));
- } else {
- Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), numInputs);
- }
+ private String getQualifiedTblName(String inputTable) {
+ String inputtblQlfdName = inputTable;
- if (numOutputs == 0) {
- Assert.assertNull(process.get(OUTPUTS));
- } else {
- Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), numOutputs);
+ if (inputTable != null && !inputTable.contains(".")) {
+ inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
}
-
- return process;
+ return inputtblQlfdName;
}
- private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception {
- String processId = assertProcessIsRegistered(query);
+ private Referenceable validateProcess(String query, String inputTable, String outputTable) throws Exception {
+ String processId = assertProcessIsRegistered(query, inputTable, outputTable);
Referenceable process = atlasClient.getEntity(processId);
- if (inputs == null) {
+ if (inputTable == null) {
Assert.assertNull(process.get(INPUTS));
} else {
- Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputs.length);
- validateInputTables(process, inputs);
+ Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), 1);
+ validateInputTables(process, inputTable);
}
- if (outputs == null) {
+ if (outputTable == null) {
Assert.assertNull(process.get(OUTPUTS));
} else {
- Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputs.length);
- validateOutputTables(process, outputs);
+ Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1 );
+ validateOutputTables(process, outputTable);
}
return process;
@@ -452,7 +447,14 @@ public class HiveHookIT {
String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
- validateProcess(query, new String[]{inputTableId}, new String[]{opTableId});
+ Referenceable processRef1 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName));
+
+ //Rerun same query. Should result in same process
+ runCommand(query);
+
+ Referenceable processRef2 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName));
+ Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
+
}
@Test
@@ -463,7 +465,7 @@ public class HiveHookIT {
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
- validateProcess(query, 1, 0);
+ validateProcess(query, getQualifiedTblName(tableName), null);
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@@ -471,17 +473,33 @@ public class HiveHookIT {
@Test
public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable();
- String pFile = createTestDFSPath("somedfspath");
+ String pFile1 = createTestDFSPath("somedfspath1");
+ String testPathNormed = lower(new Path(pFile1).toString());
String query =
- "insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName;
+ "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query);
- Referenceable processReference = validateProcess(query, 1, 1);
- validateHDFSPaths(processReference, pFile, OUTPUTS);
+ String tblQlfdname = getQualifiedTblName(tableName);
+ Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed);
+ validateHDFSPaths(processReference, pFile1, OUTPUTS);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
- validateInputTables(processReference, tableId);
+ validateInputTables(processReference, tblQlfdname);
+
+ //Rerun same query with different HDFS path
+
+ String pFile2 = createTestDFSPath("somedfspath2");
+ testPathNormed = lower(new Path(pFile2).toString());
+ query =
+ "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
+
+ runCommand(query);
+ tblQlfdname = getQualifiedTblName(tableName);
+ Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed);
+ validateHDFSPaths(process2Reference, pFile2, OUTPUTS);
+
+ Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
}
@Test
@@ -495,11 +513,10 @@ public class HiveHookIT {
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
- validateProcess(query, 1, 1);
+ validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
- String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
- String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
- validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
+ assertTableIsRegistered(DEFAULT_DB, tableName);
+ assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
}
@Test
@@ -510,11 +527,10 @@ public class HiveHookIT {
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
runCommand(query);
- validateProcess(query, 1, 1);
+ validateProcess(query, getQualifiedTblName(tableName) , getQualifiedTblName(insertTableName));
- String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
- String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
- validateProcess(query, new String[]{ipTableId}, new String[]{opTableId});
+ assertTableIsRegistered(DEFAULT_DB, tableName);
+ assertTableIsRegistered(DEFAULT_DB, insertTableName);
}
private String random() {
@@ -543,10 +559,12 @@ public class HiveHookIT {
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to \"" + filename + "\"";
+ final String testPathNormed = lower(new Path(filename).toString());
runCommand(query);
- Referenceable processReference = validateProcess(query, 1, 1);
+ String tblQlfName = getQualifiedTblName(tableName);
+ Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed);
validateHDFSPaths(processReference, filename, OUTPUTS);
- validateInputTables(processReference, tableId);
+ validateInputTables(processReference, tblQlfName);
//Import
tableName = createTable(false);
@@ -554,10 +572,11 @@ public class HiveHookIT {
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
- processReference = validateProcess(query, 1, 1);
+ tblQlfName = getQualifiedTblName(tableName);
+ processReference = validateProcess(query, testPathNormed, tblQlfName);
validateHDFSPaths(processReference, filename, INPUTS);
- validateOutputTables(processReference, tableId);
+ validateOutputTables(processReference, tblQlfName);
}
@Test
@@ -571,12 +590,14 @@ public class HiveHookIT {
runCommand(query);
String filename = "pfile://" + mkdir("export");
+ final String testPathNormed = lower(new Path(filename).toString());
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
- Referenceable processReference = validateProcess(query, 1, 1);
+ String tblQlfdName = getQualifiedTblName(tableName);
+ Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed);
validateHDFSPaths(processReference, filename, OUTPUTS);
- validateInputTables(processReference, tableId);
+ validateInputTables(processReference, tblQlfdName);
//Import
tableName = createTable(true);
@@ -584,10 +605,11 @@ public class HiveHookIT {
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
- processReference = validateProcess(query, 1, 1);
+ tblQlfdName = getQualifiedTblName(tableName);
+ processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, filename, INPUTS);
- validateOutputTables(processReference, tableId);
+ validateOutputTables(processReference, tblQlfdName);
}
@Test
@@ -750,7 +772,7 @@ public class HiveHookIT {
});
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+ HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
//Change name and add comment
oldColName = "name2";
@@ -847,8 +869,9 @@ public class HiveHookIT {
String query = String.format("truncate table %s", tableName);
runCommand(query);
+
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
- validateProcess(query, 0, 1);
+ validateProcess(query, null, getQualifiedTblName(tableName));
//Check lineage
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
@@ -916,16 +939,17 @@ public class HiveHookIT {
}
});
- Referenceable processReference = validateProcess(query, 1, 1);
- validateHDFSPaths(processReference, testPath, INPUTS);
+ final String tblQlfdName = getQualifiedTblName(tableName);
- validateOutputTables(processReference, tableId);
+ final String testPathNormed = lower(new Path(testPath).toString());
+ Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
+ validateHDFSPaths(processReference, testPath, INPUTS);
}
private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception {
List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
- final String testPathNormed = normalize(new Path(testPath).toString());
+ final String testPathNormed = lower(new Path(testPath).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
@@ -1083,7 +1107,7 @@ public class HiveHookIT {
//Verify columns are not registered for one of the tables
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
- HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
+ HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]),
HiveDataModelGenerator.NAME));
@@ -1316,14 +1340,55 @@ public class HiveHookIT {
}
}
- private String assertProcessIsRegistered(String queryStr) throws Exception {
- LOG.debug("Searching for process with query {}", queryStr);
- return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr), null);
+ private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String outputTblName) throws Exception {
+
+ HiveASTRewriter astRewriter = new HiveASTRewriter(conf);
+ String normalizedQuery = normalize(astRewriter.rewrite(queryStr));
+
+ List<Referenceable> inputs = null;
+
+ if (inputTblName != null) {
+ Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
+ put(HiveDataModelGenerator.NAME, inputTblName);
+ }});
+ inputs = new ArrayList<Referenceable>();
+ inputs.add(inputTableRef);
+ }
+ List<Referenceable> outputs = null;
+ if (outputTblName != null) {
+ Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
+ put(HiveDataModelGenerator.NAME, outputTblName);
+ }});
+
+ outputs = new ArrayList<Referenceable>();
+ outputs.add(outputTableRef);
+ }
+ String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs);
+ LOG.debug("Searching for process with query {}", processQFName);
+ return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
+ @Override
+ public void assertOnEntity(final Referenceable entity) throws Exception {
+ List<String> recentQueries = (List<String>) entity.get("recentQueries");
+ Assert.assertEquals(recentQueries.get(0), queryStr);
+ }
+ });
+ }
+
+ private String assertProcessIsRegistered(final String queryStr) throws Exception {
+ String lowerQryStr = lower(queryStr);
+ LOG.debug("Searching for process with query {}", lowerQryStr);
+ return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, lowerQryStr, new AssertPredicate() {
+ @Override
+ public void assertOnEntity(final Referenceable entity) throws Exception {
+ List<String> recentQueries = (List<String>) entity.get("recentQueries");
+ Assert.assertEquals(recentQueries.get(0), queryStr);
+ }
+ });
}
private void assertProcessIsNotRegistered(String queryStr) throws Exception {
LOG.debug("Searching for process with query {}", queryStr);
- assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr));
+ assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, normalize(queryStr));
}
private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/hive-bridge/src/test/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/resources/hive-site.xml b/addons/hive-bridge/src/test/resources/hive-site.xml
index f1facb8..058e546 100644
--- a/addons/hive-bridge/src/test/resources/hive-site.xml
+++ b/addons/hive-bridge/src/test/resources/hive-site.xml
@@ -17,6 +17,16 @@
<configuration>
<property>
+ <name>mapreduce.framework.name</name>
+ <value>local</value>
+ </property>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>file:///</value>
+ </property>
+
+ <property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index ab7e6ee..18474ad 100644
--- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -105,7 +105,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef,
SqoopJobDataPublisher.Data data, String clusterName) {
Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
- procRef.set(SqoopDataModelGenerator.NAME, getSqoopProcessName(data, clusterName));
+ final String sqoopProcessName = getSqoopProcessName(data, clusterName);
+ procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName);
+ procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation());
procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef);
procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
----------------------------------------------------------------------
diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
index 2820169..a81ee15 100644
--- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
+++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java
@@ -107,8 +107,8 @@ public class SqoopHookIT {
private String assertSqoopProcessIsRegistered(String processName) throws Exception {
LOG.debug("Searching for sqoop process {}", processName);
String query = String.format(
- "%s as t where name = '%s' select t",
- SqoopDataTypes.SQOOP_PROCESS.getName(), processName);
+ "%s as t where %s = '%s' select t",
+ SqoopDataTypes.SQOOP_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processName);
return assertEntityIsRegistered(query);
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 267e228..4448105 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -110,7 +110,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
Referenceable topologyReferenceable = new Referenceable(
StormDataTypes.STORM_TOPOLOGY.getName());
topologyReferenceable.set("id", topologyInfo.get_id());
- topologyReferenceable.set("name", topologyInfo.get_name());
+ topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name());
+ topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name());
String owner = topologyInfo.get_owner();
if (StringUtils.isEmpty(owner)) {
owner = ANONYMOUS_OWNER;
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala b/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala
index de67c39..a982e61 100644
--- a/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala
+++ b/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala
@@ -18,6 +18,7 @@
package org.apache.atlas.storm.model
+import org.apache.atlas.AtlasClient
import org.apache.atlas.typesystem.TypesDef
import org.apache.atlas.typesystem.builders.TypesBuilder
import org.apache.atlas.typesystem.json.TypesSerialization
@@ -42,7 +43,7 @@ object StormDataModel extends App {
* Also, Topology contains the Graph of Nodes
* Topology => Node(s) -> Spouts/Bolts
*/
- _class(StormDataTypes.STORM_TOPOLOGY.getName, List("Process")) {
+ _class(StormDataTypes.STORM_TOPOLOGY.getName, List(AtlasClient.PROCESS_SUPER_TYPE)) {
"id" ~ (string, required, indexed, unique)
"description" ~ (string, optional, indexed)
"owner" ~ (string, required, indexed)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index fe79005..dcaeecd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
+ATLAS-619 Canonicalize hive queries (sumasai)
ATLAS-497 Simple Authorization (saqeeb.s via yhemanth)
ATLAS-661 REST API Authentication (nixonrodrigues via yhemanth)
ATLAS-672 UI: Make dashboard v2 the default UI implementation (bergenholtz via yhemanth)
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index 9f69940..5195cbe 100755
--- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -182,20 +182,21 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
DESCRIPTION_ATTRIBUTE);
createType(datasetType);
- HierarchicalTypeDefinition<ClassType> processType = TypesUtil
- .createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(), NAME_ATTRIBUTE,
- DESCRIPTION_ATTRIBUTE,
- new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
- Multiplicity.OPTIONAL, false, null),
- new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
- Multiplicity.OPTIONAL, false, null));
- createType(processType);
-
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
.createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(),
TypesUtil.createUniqueRequiredAttrDef(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
DataTypes.STRING_TYPE));
createType(referenceableType);
+
+ HierarchicalTypeDefinition<ClassType> processType = TypesUtil
+ .createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(AtlasClient.REFERENCEABLE_SUPER_TYPE),
+ TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE),
+ DESCRIPTION_ATTRIBUTE,
+ new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
+ Multiplicity.OPTIONAL, false, null),
+ new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
+ Multiplicity.OPTIONAL, false, null));
+ createType(processType);
}
private void createType(HierarchicalTypeDefinition<ClassType> type) throws AtlasException {
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
index 66e1365..40f0d91 100644
--- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
@@ -331,7 +331,8 @@ public class BaseHiveRepositoryTest {
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
- referenceable.set("name", name);
+ referenceable.set(AtlasClient.NAME, name);
+ referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
index 70dce6b..79feb39 100755
--- a/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
+++ b/webapp/src/main/java/org/apache/atlas/examples/QuickStart.java
@@ -376,7 +376,8 @@ public class QuickStart {
throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
// super type attributes
- referenceable.set("name", name);
+ referenceable.set(AtlasClient.NAME, name);
+ referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set(INPUTS_ATTRIBUTE, inputTables);
referenceable.set(OUTPUTS_ATTRIBUTE, outputTables);
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
index cdf6049..2912464 100644
--- a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
+++ b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java
@@ -18,6 +18,8 @@
package org.apache.atlas.examples;
+import org.apache.atlas.Atlas;
+import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
@@ -94,10 +96,10 @@ public class QuickStartIT extends BaseResourceIT {
@Test
public void testProcessIsAdded() throws AtlasServiceException, JSONException {
- Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, "name",
+ Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
QuickStart.LOAD_SALES_DAILY_PROCESS);
- assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get("name"));
+ assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get(AtlasClient.NAME));
assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS_DESCRIPTION, loadProcess.get("description"));
List<Id> inputs = (List<Id>)loadProcess.get(QuickStart.INPUTS_ATTRIBUTE);
@@ -141,12 +143,12 @@ public class QuickStartIT extends BaseResourceIT {
@Test
public void testViewIsAdded() throws AtlasServiceException, JSONException {
- Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, "name", QuickStart.PRODUCT_DIM_VIEW);
+ Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, AtlasClient.NAME, QuickStart.PRODUCT_DIM_VIEW);
- assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get("name"));
+ assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get(AtlasClient.NAME));
Id productDimId = getTable(QuickStart.PRODUCT_DIM_TABLE).getId();
- Id inputTableId = ((List<Id>)view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0);
+ Id inputTableId = ((List<Id>) view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0);
assertEquals(productDimId, inputTableId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9e1f3663/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
index bc02f90..0fb5ea2 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/HiveLineageJerseyResourceIT.java
@@ -184,7 +184,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
"Joe BI", "MANAGED", salesFactColumns, "Metric");
- loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
+ String procName = "loadSalesDaily" + randomString();
+ loadProcess(procName, "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
@@ -237,7 +238,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
- referenceable.set("name", name);
+ referenceable.set(AtlasClient.NAME, name);
+ referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);