You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/04/07 06:57:42 UTC

[nifi] branch main updated: NIFI-8390: Handle HBase namespaces in Atlas reporting task

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b852ba  NIFI-8390: Handle HBase namespaces in Atlas reporting task
4b852ba is described below

commit 4b852ba7c89097da25bcf9160631eca6e9d9ac28
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Thu Apr 1 21:29:42 2021 +0200

    NIFI-8390: Handle HBase namespaces in Atlas reporting task
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4977.
---
 .../nifi/atlas/provenance/analyzer/HBaseTable.java | 56 ++++++++++---
 .../atlas/provenance/analyzer/TestHBaseTable.java  | 95 +++++++++++++++-------
 2 files changed, 109 insertions(+), 42 deletions(-)

diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
index e2ff8fb..0d3cd9d 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
@@ -28,22 +28,29 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
 
 /**
  * Analyze a transit URI as a HBase table.
- * <li>qualifiedName=tableName@namespace (example: myTable@ns1)
- * <li>name=tableName (example: myTable)
+ * <li>qualifiedName=hbaseNamespace:tableName@namespace (example: default:myTable@ns1)
+ * <li>name=[hbaseNamespace:]tableName (example: myTable)
  */
 public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
 
     private static final Logger logger = LoggerFactory.getLogger(HBaseTable.class);
-    private static final String TYPE = "hbase_table";
 
-    // hbase://masterAddress/hbaseTableName/hbaseRowId(optional)
-    private static final Pattern URI_PATTERN = Pattern.compile("^hbase://([^/]+)/([^/]+)/?.*$");
+    static final String TYPE_HBASE_TABLE = "hbase_table";
+    static final String TYPE_HBASE_NAMESPACE = "hbase_namespace";
+
+    static final String ATTR_NAMESPACE = "namespace";
+
+    static final String DEFAULT_NAMESPACE = "default";
+
+    // hbase://masterAddress/[hbaseNamespace:]hbaseTableName/hbaseRowId(optional)
+    private static final Pattern URI_PATTERN = Pattern.compile("^hbase://([^/]+)/(([^/]+):)?([^/]+)/?.*$");
 
     @Override
     public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
@@ -51,25 +58,48 @@ public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {
         final String transitUri = event.getTransitUri();
         final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
         if (!uriMatcher.matches()) {
-            logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
+            logger.warn("Unexpected transit URI: {}", transitUri);
             return null;
         }
 
-        final Referenceable ref = new Referenceable(TYPE);
         final String[] hostNames = splitHostNames(uriMatcher.group(1));
         final String namespace = context.getNamespaceResolver().fromHostNames(hostNames);
 
-        final String tableName = uriMatcher.group(2);
-        ref.set(ATTR_NAME, tableName);
-        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, tableName));
-        // TODO: 'uri' is a mandatory attribute, but what should we set?
-        ref.set(ATTR_URI, transitUri);
+        final String hbaseNamespaceName = uriMatcher.group(3) != null ? uriMatcher.group(3) : DEFAULT_NAMESPACE;
+        final String hbaseTableName = uriMatcher.group(4);
+
+        final Referenceable hbaseNamespaceRef = createHBaseNamespaceRef(namespace, hbaseNamespaceName);
+        final Referenceable hbaseTableRef = getHBaseTableRef(namespace, hbaseTableName, hbaseNamespaceRef);
 
-        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), hbaseTableRef);
     }
 
     @Override
     public String targetTransitUriPattern() {
         return "^hbase://.+$";
     }
+
+    private Referenceable createHBaseNamespaceRef(String namespace, String hbaseNamespaceName) {
+        final Referenceable hbaseNamespaceRef = new Referenceable(TYPE_HBASE_NAMESPACE);
+
+        hbaseNamespaceRef.set(ATTR_NAME, hbaseNamespaceName);
+        hbaseNamespaceRef.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, hbaseNamespaceName));
+        hbaseNamespaceRef.set(ATTR_CLUSTER_NAME, namespace);
+
+        return hbaseNamespaceRef;
+    }
+
+    private Referenceable getHBaseTableRef(String namespace, String hbaseTableName, Referenceable hbaseNamespaceRef) {
+        final Referenceable hbaseTableRef = new Referenceable(TYPE_HBASE_TABLE);
+
+        final String hbaseTableFullName = String.format("%s:%s", hbaseNamespaceRef.get(ATTR_NAME), hbaseTableName);
+        final boolean isDefaultHBaseNamespace = DEFAULT_NAMESPACE.equals(hbaseNamespaceRef.get(ATTR_NAME));
+
+        hbaseTableRef.set(ATTR_NAME, isDefaultHBaseNamespace ? hbaseTableName : hbaseTableFullName);
+        hbaseTableRef.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, hbaseTableFullName));
+        hbaseTableRef.set(ATTR_NAMESPACE, hbaseNamespaceRef);
+        hbaseTableRef.set(ATTR_URI, isDefaultHBaseNamespace ? hbaseTableName : hbaseTableFullName);
+
+        return hbaseTableRef;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
index 5d7296c..019b7f5 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
@@ -27,50 +27,59 @@ import org.apache.nifi.provenance.ProvenanceEventType;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
+import static org.apache.nifi.atlas.provenance.analyzer.HBaseTable.ATTR_NAMESPACE;
+import static org.apache.nifi.atlas.provenance.analyzer.HBaseTable.TYPE_HBASE_NAMESPACE;
+import static org.apache.nifi.atlas.provenance.analyzer.HBaseTable.TYPE_HBASE_TABLE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.matches;
 import static org.mockito.Mockito.when;
 
 public class TestHBaseTable {
 
+    private static final String PROCESSOR_NAME = "FetchHBaseRow";
+    private static final String ATLAS_METADATA_NAMESPACE = "namespace1";
+
     @Test
-    public void testHBaseTable() {
-        final String processorName = "FetchHBaseRow";
+    public void testHBaseTableImplicitDefaultNamespace() {
         final String transitUri = "hbase://0.example.com/tableA/rowB";
-        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
-        when(record.getComponentType()).thenReturn(processorName);
-        when(record.getTransitUri()).thenReturn(transitUri);
-        when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
+        final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);
+
+        final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
+        when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn(ATLAS_METADATA_NAMESPACE);
+
+        executeTest(record, namespaceResolvers, "tableA", "default:tableA@namespace1", "tableA", "default", "default@namespace1");
+    }
+
+    @Test
+    public void testHBaseTableExplicitDefaultNamespace() {
+        final String transitUri = "hbase://0.example.com/default:tableA/rowB";
+        final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);
 
         final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
         when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
 
-        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
-        when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
+        executeTest(record, namespaceResolvers, "tableA", "default:tableA@namespace1", "tableA", "default", "default@namespace1");
+    }
 
-        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
-        assertNotNull(analyzer);
+    @Test
+    public void testHBaseTableCustomNamespace() {
+        final String transitUri = "hbase://0.example.com/namespaceA:tableA/rowB";
+        final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);
 
-        final DataSetRefs refs = analyzer.analyze(context, record);
-        assertEquals(1, refs.getInputs().size());
-        assertEquals(0, refs.getOutputs().size());
-        Referenceable ref = refs.getInputs().iterator().next();
-        assertEquals("hbase_table", ref.getTypeName());
-        assertEquals("tableA", ref.get(ATTR_NAME));
-        assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
+        final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
+        when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
+
+        executeTest(record, namespaceResolvers, "namespaceA:tableA", "namespaceA:tableA@namespace1", "namespaceA:tableA", "namespaceA", "namespaceA@namespace1");
     }
 
     @Test
     public void testHBaseTableWithMultipleZkHosts() {
-        final String processorName = "FetchHBaseRow";
         final String transitUri = "hbase://zk0.example.com,zk2.example.com,zk3.example.com/tableA/rowB";
-        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
-        when(record.getComponentType()).thenReturn(processorName);
-        when(record.getTransitUri()).thenReturn(transitUri);
-        when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
+        final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);
 
         final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
         when(namespaceResolvers.fromHostNames(
@@ -78,19 +87,47 @@ public class TestHBaseTable {
                 matches("zk2.example.com"),
                 matches("zk3.example.com"))).thenReturn("namespace1");
 
+        executeTest(record, namespaceResolvers, "tableA", "default:tableA@namespace1", "tableA", "default", "default@namespace1");
+    }
+
+    private ProvenanceEventRecord mockProvenanceEventRecord(String transitUri) {
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+
+        when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
+        when(record.getComponentType()).thenReturn(PROCESSOR_NAME);
+        when(record.getTransitUri()).thenReturn(transitUri);
+
+        return record;
+    }
+
+    private void executeTest(ProvenanceEventRecord record, NamespaceResolvers namespaceResolvers, String expectedTableName, String expectedTableQualifiedName, String expectedTableUri,
+                             String expectedNamespaceName, String expectedNamespaceQualifiedName) {
         final AnalysisContext context = Mockito.mock(AnalysisContext.class);
         when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
 
-        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
-        assertNotNull(analyzer);
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(PROCESSOR_NAME, record.getTransitUri(), record.getEventType());
 
         final DataSetRefs refs = analyzer.analyze(context, record);
+
+        assertAnalysisResult(refs, expectedTableName, expectedTableQualifiedName, expectedTableUri, expectedNamespaceName, expectedNamespaceQualifiedName);
+    }
+
+    private void assertAnalysisResult(DataSetRefs refs, String expectedTableName, String expectedTableQualifiedName, String expectedTableUri,
+                                      String expectedNamespaceName, String expectedNamespaceQualifiedName) {
         assertEquals(1, refs.getInputs().size());
         assertEquals(0, refs.getOutputs().size());
-        Referenceable ref = refs.getInputs().iterator().next();
-        assertEquals("hbase_table", ref.getTypeName());
-        assertEquals("tableA", ref.get(ATTR_NAME));
-        assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
+
+        Referenceable tableRef = refs.getInputs().iterator().next();
+        assertEquals(TYPE_HBASE_TABLE, tableRef.getTypeName());
+        assertEquals(expectedTableName, tableRef.get(ATTR_NAME));
+        assertEquals(expectedTableQualifiedName, tableRef.get(ATTR_QUALIFIED_NAME));
+        assertEquals(expectedTableUri, tableRef.get(ATTR_URI));
+
+        Referenceable namespaceRef = (Referenceable) tableRef.get(ATTR_NAMESPACE);
+        assertEquals(TYPE_HBASE_NAMESPACE, namespaceRef.getTypeName());
+        assertEquals(expectedNamespaceName, namespaceRef.get(ATTR_NAME));
+        assertEquals(expectedNamespaceQualifiedName, namespaceRef.get(ATTR_QUALIFIED_NAME));
+        assertEquals(ATLAS_METADATA_NAMESPACE, namespaceRef.get(ATTR_CLUSTER_NAME));
     }
 
 }