You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/04/09 12:14:05 UTC

[nifi] branch master updated: NIFI-7345: Fixed Hive database and table names case insensitivity in Atlas reporting task

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 08dcd4a  NIFI-7345: Fixed Hive database and table names case insensitivity in Atlas reporting task
08dcd4a is described below

commit 08dcd4af14386183d0cd84690772b5ad93f75b9f
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Wed Apr 8 23:27:50 2020 +0200

    NIFI-7345: Fixed Hive database and table names case insensitivity in Atlas reporting task
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4198.
---
 .../provenance/analyzer/DatabaseAnalyzerUtil.java  |  3 +-
 .../nifi/atlas/provenance/analyzer/Hive2JDBC.java  |  4 +-
 .../atlas/provenance/analyzer/TestHive2JDBC.java   | 68 +++++++++++-----------
 .../provenance/analyzer/TestPutHiveStreaming.java  |  6 +-
 4 files changed, 41 insertions(+), 40 deletions(-)

diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java
index 63ab1bf..94db2c8 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java
@@ -52,7 +52,8 @@ public class DatabaseAnalyzerUtil {
         }
         final String databaseName = tableNameSplit.length == 2 ? tableNameSplit[0] : connectedDatabaseName;
         final String tableName = tableNameSplit.length == 2 ? tableNameSplit[1] : tableNameSplit[0];
-        return new Tuple<>(databaseName, tableName);
+        // Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
+        return new Tuple<>(databaseName.toLowerCase(), tableName.toLowerCase());
     }
 
     public static String toTableNameStr(Tuple<String, String> tableName) {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
index ffed41f..7821f86 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
@@ -94,8 +94,8 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
 
         if (inputTables.isEmpty() && outputTables.isEmpty()) {
             // If input/output tables are unknown, create database level lineage.
-            return getDatabaseRef(event.getComponentId(), event.getEventType(),
-                    clusterName, connectedDatabaseName);
+            // Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
+            return getDatabaseRef(event.getComponentId(), event.getEventType(), clusterName, connectedDatabaseName.toLowerCase());
         }
 
         final DataSetRefs refs = new DataSetRefs(event.getComponentId());
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
index 5d5fcd6..e7e6a91 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
@@ -50,7 +50,7 @@ public class TestHive2JDBC {
     @Test
     public void testDatabaseLineage() {
         final String processorName = "PutHiveQL";
-        final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA";
+        final String transitUri = "jdbc:hive2://0.example.com:10000/database_A";
         final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
         when(record.getComponentType()).thenReturn(processorName);
         when(record.getTransitUri()).thenReturn(transitUri);
@@ -70,8 +70,8 @@ public class TestHive2JDBC {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_db", ref.getTypeName());
-        assertEquals("databaseA", ref.get(ATTR_NAME));
-        assertEquals("databaseA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("database_a", ref.get(ATTR_NAME));
+        assertEquals("database_a@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
     /**
@@ -81,14 +81,14 @@ public class TestHive2JDBC {
     @Test
     public void testTableLineage() {
         final String processorName = "PutHiveQL";
-        final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA";
+        final String transitUri = "jdbc:hive2://0.example.com:10000/database_A";
         final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
         when(record.getComponentType()).thenReturn(processorName);
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
         // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
-        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
-        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
 
         final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
         when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
@@ -103,8 +103,8 @@ public class TestHive2JDBC {
         assertEquals(2, refs.getInputs().size());
         // QualifiedName : Name
         final Map<String, String> expectedInputRefs = new HashMap<>();
-        expectedInputRefs.put("databaseA.tableA1@cluster1", "tableA1");
-        expectedInputRefs.put("databaseA.tableA2@cluster1", "tableA2");
+        expectedInputRefs.put("database_a.table_a1@cluster1", "table_a1");
+        expectedInputRefs.put("database_a.table_a2@cluster1", "table_a2");
         for (Referenceable ref : refs.getInputs()) {
             final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
             assertTrue(expectedInputRefs.containsKey(qName));
@@ -114,8 +114,8 @@ public class TestHive2JDBC {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_table", ref.getTypeName());
-        assertEquals("tableB1", ref.get(ATTR_NAME));
-        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("table_b1", ref.get(ATTR_NAME));
+        assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
     /**
@@ -131,8 +131,8 @@ public class TestHive2JDBC {
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
         // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
-        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
-        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
 
         final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
         when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
@@ -147,8 +147,8 @@ public class TestHive2JDBC {
         assertEquals(2, refs.getInputs().size());
         // QualifiedName : Name
         final Map<String, String> expectedInputRefs = new HashMap<>();
-        expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
-        expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
+        expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
+        expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
         for (Referenceable ref : refs.getInputs()) {
             final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
             assertTrue(expectedInputRefs.containsKey(qName));
@@ -158,8 +158,8 @@ public class TestHive2JDBC {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_table", ref.getTypeName());
-        assertEquals("tableB1", ref.get(ATTR_NAME));
-        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("table_b1", ref.get(ATTR_NAME));
+        assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
     /**
@@ -174,8 +174,8 @@ public class TestHive2JDBC {
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
         // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
-        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
-        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
 
         final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
         when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
@@ -190,8 +190,8 @@ public class TestHive2JDBC {
         assertEquals(2, refs.getInputs().size());
         // QualifiedName : Name
         final Map<String, String> expectedInputRefs = new HashMap<>();
-        expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
-        expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
+        expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
+        expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
         for (Referenceable ref : refs.getInputs()) {
             final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
             assertTrue(expectedInputRefs.containsKey(qName));
@@ -201,8 +201,8 @@ public class TestHive2JDBC {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_table", ref.getTypeName());
-        assertEquals("tableB1", ref.get(ATTR_NAME));
-        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("table_b1", ref.get(ATTR_NAME));
+        assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
     /**
@@ -219,8 +219,8 @@ public class TestHive2JDBC {
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
         // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
-        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
-        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
 
         final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
         when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1");
@@ -235,8 +235,8 @@ public class TestHive2JDBC {
         assertEquals(2, refs.getInputs().size());
         // QualifiedName : Name
         final Map<String, String> expectedInputRefs = new HashMap<>();
-        expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
-        expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
+        expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
+        expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
         for (Referenceable ref : refs.getInputs()) {
             final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
             assertTrue(expectedInputRefs.containsKey(qName));
@@ -246,8 +246,8 @@ public class TestHive2JDBC {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_table", ref.getTypeName());
-        assertEquals("tableB1", ref.get(ATTR_NAME));
-        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("table_b1", ref.get(ATTR_NAME));
+        assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
     /**
@@ -262,8 +262,8 @@ public class TestHive2JDBC {
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
         // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
-        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
-        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
 
         final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
         when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1");
@@ -278,8 +278,8 @@ public class TestHive2JDBC {
         assertEquals(2, refs.getInputs().size());
         // QualifiedName : Name
         final Map<String, String> expectedInputRefs = new HashMap<>();
-        expectedInputRefs.put("some_database.tableA1@cluster1", "tableA1");
-        expectedInputRefs.put("some_database.tableA2@cluster1", "tableA2");
+        expectedInputRefs.put("some_database.table_a1@cluster1", "table_a1");
+        expectedInputRefs.put("some_database.table_a2@cluster1", "table_a2");
         for (Referenceable ref : refs.getInputs()) {
             final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
             assertTrue(expectedInputRefs.containsKey(qName));
@@ -289,8 +289,8 @@ public class TestHive2JDBC {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_table", ref.getTypeName());
-        assertEquals("tableB1", ref.get(ATTR_NAME));
-        assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("table_b1", ref.get(ATTR_NAME));
+        assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
index 606f6d5..0194cdd 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
@@ -53,7 +53,7 @@ public class TestPutHiveStreaming {
         when(record.getComponentType()).thenReturn(processorName);
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
-        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseA.tableA");
+        when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_A.table_A");
 
         final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
         when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
@@ -69,7 +69,7 @@ public class TestPutHiveStreaming {
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hive_table", ref.getTypeName());
-        assertEquals("tableA", ref.get(ATTR_NAME));
-        assertEquals("databaseA.tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("table_a", ref.get(ATTR_NAME));
+        assertEquals("database_a.table_a@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 }