You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/05/08 10:40:26 UTC

[20/37] oozie git commit: OOZIE-2701 Oozie to support Multiple HCatalog URIs (abhishekbafna)

OOZIE-2701 Oozie to support Multiple HCatalog URIs (abhishekbafna)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/89be33bb
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/89be33bb
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/89be33bb

Branch: refs/heads/oya
Commit: 89be33bb134285faa7d22d756ef34f56ef490e5f
Parents: ce7eb31
Author: abhisek bafna <ab...@hortonworks.com>
Authored: Thu Apr 6 19:40:55 2017 +0530
Committer: abhisek bafna <ab...@hortonworks.com>
Committed: Thu Apr 6 19:40:55 2017 +0530

----------------------------------------------------------------------
 .../org/apache/oozie/coord/HCatELFunctions.java | 15 ++--
 .../apache/oozie/dependency/HCatURIHandler.java | 12 ++-
 core/src/main/resources/oozie-default.xml       |  6 ++
 .../apache/oozie/util/TestHCatURIParser.java    | 87 ++++++++++++++++++++
 .../src/site/twiki/DG_HCatalogIntegration.twiki | 33 ++++++++
 release-log.txt                                 |  1 +
 .../java/org/apache/oozie/util/HCatURI.java     | 21 +++++
 .../org/apache/oozie/util/HCatURIParser.java    | 48 +++++++++++
 .../java/org/apache/oozie/util/TestHCatURI.java | 21 +++--
 9 files changed, 227 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
index 9475f72..f40f406 100644
--- a/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
@@ -20,15 +20,18 @@ package org.apache.oozie.coord;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.DagELFunctions;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.HCatURIParser;
 import org.apache.oozie.util.XLog;
 
 /**
@@ -37,6 +40,8 @@ import org.apache.oozie.util.XLog;
 
 public class HCatELFunctions {
     private static final Configuration EMPTY_CONF = new Configuration(true);
+    private static final String HCAT_URI_REGEX_CONFIG = ConfigurationService.get("oozie.hcat.uri.regex.pattern");
+    private static final Pattern HCAT_URI_PATTERN = Pattern.compile(HCAT_URI_REGEX_CONFIG);
 
     enum EventType {
         input, output
@@ -291,7 +296,7 @@ public class HCatELFunctions {
         String partitionValue = null;
         if (uri != null) {
             if (type.equals("hive-export")) {
-                String[] uriList = uri.split(CoordELFunctions.DIR_SEPARATOR);
+                String[] uriList = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN);
                 if (uriList.length > 1) {
                     throw new RuntimeException("Multiple partitions not supported for hive-export type. Dataset name: "
                         + dataInName + " URI: " + uri);
@@ -331,7 +336,7 @@ public class HCatELFunctions {
         }
         String minPartition = null;
         if (uris != null) {
-            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+            String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
             // get the partition values list and find minimum
             try {
                 // initialize minValue with first partition value
@@ -376,7 +381,7 @@ public class HCatELFunctions {
         }
         String maxPartition = null;
         if (uris != null) {
-            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+            String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
             // get the partition values list and find minimum
             try {
                 // initialize minValue with first partition value
@@ -403,7 +408,7 @@ public class HCatELFunctions {
     }
 
     private static String createPartitionFilter(String uris, String type) {
-        String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+        String[] uriList = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
         StringBuilder filter = new StringBuilder("");
         if (uriList.length > 0) {
             for (String uri : uriList) {
@@ -433,7 +438,7 @@ public class HCatELFunctions {
             uris = (String) eval.getVariable(".dataout." + dataInName);
         }
         if (uris != null) {
-            String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
+            String[] uri = HCatURIParser.splitHCatUris(uris, HCAT_URI_PATTERN);
             uriTemplate.append(uri[0]);
         }
         else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
index 4cc284a..c60c811 100644
--- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
@@ -241,7 +241,7 @@ public class HCatURIHandler implements URIHandler {
 
     }
 
-    private HiveConf getHiveConf(URI uri, Configuration conf){
+    private HiveConf getHiveConf(URI uri, Configuration conf) throws HCatAccessorException {
         HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
         if (hcatService.getHCatConf() != null) {
             conf = hcatService.getHCatConf();
@@ -319,7 +319,7 @@ public class HCatURIHandler implements URIHandler {
         }
     }
 
-    private String getMetastoreConnectURI(URI uri) {
+    private String getMetastoreConnectURI(URI uri) throws HCatAccessorException {
         String metastoreURI;
         // For unit tests
         if (uri.getAuthority().equals("unittest-local")) {
@@ -328,7 +328,13 @@ public class HCatURIHandler implements URIHandler {
         else {
             // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
             // is added
-            metastoreURI = "thrift://" + uri.getAuthority();
+            HCatURI hCatURI;
+            try {
+                hCatURI = new HCatURI(uri.toString());
+                metastoreURI = hCatURI.getServerEndPointWithScheme("thrift");
+            } catch (URISyntaxException e) {
+                throw new HCatAccessorException(ErrorCode.E0902, e);
+            }
         }
         return metastoreURI;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index fe095ca..ae384b4 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -3009,4 +3009,10 @@ will be the requeue interval for the actions which are waiting for a long time w
         </description>
     </property>
 
+    <property>
+        <name>oozie.hcat.uri.regex.pattern</name>
+        <value>([a-z]+://[\w\.\-]+:\d+[,]*)+/\w+/\w+/?[\w+=;\-]*</value>
+        <description>Regex pattern for HCat URIs. The regex can be modified by users as per requirement
+            for parsing/splitting the HCat URIs.</description>
+    </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java b/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java
new file mode 100644
index 0000000..54cda3a
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestHCatURIParser.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XTestCase;
+
+import java.net.URI;
+import java.util.regex.Pattern;
+
+public class TestHCatURIParser extends XTestCase {
+
+    private Services services;
+    private Pattern HCAT_URI_PATTERN;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        String HCAT_URI_REGEX_CONFIG = ConfigurationService.get("oozie.hcat.uri.regex.pattern");
+        HCAT_URI_PATTERN = Pattern.compile(HCAT_URI_REGEX_CONFIG);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        services.destroy();
+    }
+
+    public void testWhenMultipleHCatURIsAreSplitPartsAreExtractedCorrectly1() {
+        String uri = "hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us,scheme://hostname3:3000," +
+                "scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us,scheme://hostname4:4000/d/t/p=1";
+        String[] uris = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN);
+        assertEquals(3, uris.length);
+        assertEquals("hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us", uris[0]);
+        assertEquals("scheme://hostname3:3000,scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us", uris[1]);
+        assertEquals("scheme://hostname4:4000/d/t/p=1", uris[2]);
+    }
+
+    public void testWhenMultipleHCatURIsAreSplitPartsAreExtractedCorrectly2() {
+        String uri = "thrift://host.name1:1000/mydb/clicks/datastamp=12;region=u_s";
+        String[] uris = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN);
+        assertEquals(1, uris.length);
+        assertEquals("thrift://host.name1:1000/mydb/clicks/datastamp=12;region=u_s", uris[0]);
+    }
+
+    public void testWhenMultipleHCatURIsAreSplitPartsAreExtractedCorrectly3() {
+        String uri = "hcat://10.10.10.10:9083/default/invites/ds=2010-01-01;region=usa";
+        String[] uris = HCatURIParser.splitHCatUris(uri, HCAT_URI_PATTERN);
+        assertEquals(1, uris.length);
+        assertEquals("hcat://10.10.10.10:9083/default/invites/ds=2010-01-01;region=usa", uris[0]);
+    }
+
+    public void testParsingMultipleHCatServerURI() throws Exception {
+        String uriStr = "hcat://hcat.server.com:5080,hcat://hcat.server1.com:5080/mydb/clicks/datastamp=12;region=us";
+        String[] uris = HCatURIParser.splitHCatUris(uriStr, HCAT_URI_PATTERN);
+        URI uri = HCatURIParser.parseURI(new URI(uris[0]));
+        assertEquals("hcat", uri.getScheme());
+        assertEquals("hcat.server.com:5080,hcat.server1.com:5080", uri.getAuthority());
+    }
+
+    public void testParsingSingleServerURI() throws Exception {
+        String uriStr = "hdfs://namenode.example.com:8020/path/to/directory/file";
+        URI uri = HCatURIParser.parseURI(new URI(uriStr));
+        assertEquals("hdfs", uri.getScheme());
+        assertEquals("namenode.example.com:8020", uri.getAuthority());
+        assertEquals("/path/to/directory/file", uri.getPath());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/docs/src/site/twiki/DG_HCatalogIntegration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_HCatalogIntegration.twiki b/docs/src/site/twiki/DG_HCatalogIntegration.twiki
index 39df67c..d3107b4 100644
--- a/docs/src/site/twiki/DG_HCatalogIntegration.twiki
+++ b/docs/src/site/twiki/DG_HCatalogIntegration.twiki
@@ -59,9 +59,11 @@ Oozie supports specifying HCatalog partitions as a data dependency through a URI
 used to identify a set of table partitions: hcat://bar:8020/logsDB/logsTable/dt=20090415;region=US.
 
 The format to specify a HCatalog table URI is:
+
 hcat://[metastore server]:[port]/[database name]/[table name]
 
 The format to specify a HCatalog table partition URI is:
+
 hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value];...
 
 For example,
@@ -74,6 +76,37 @@ For example,
   </dataset>
 </verbatim>
 
+Post Oozie-4.3.0 release, Oozie also supports the multiple HCatalog servers in the URI. Each of the server needs to be
+separated by single comma (,).
+
+The format to specify a HCatalog table partition URI with multiple HCatalog server is:
+
+hcat://[metastore_server]:[port],[metastore_server]:[port]/[database_name]/[table_name]/[partkey1]=[value];[partkey2]=[value];...
+
+For example,
+<verbatim>
+  <dataset name="logs" frequency="${coord:days(1)}"
+           initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles">
+    <uri-template>
+      hcat://myhcatmetastore:9080,myhcatmetastore:9080/database1/table1/datestamp=${YEAR}${MONTH}${DAY}${HOUR};region=USA
+    </uri-template>
+  </dataset>
+</verbatim>
+
+The regex for parsing the multiple HCatalog URI is exposed via oozie-site.xml, So Users can modify if there is any
+requirement. Key for the regex is: =oozie.hcat.uri.regex.pattern=
+
+For example, following has multiple HCatalog URI with multiple HCatalog servers. To understand this, Oozie will split them into
+two HCatalog URIs. For splitting the URIs, above mentioned regex is used.
+
+hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us,scheme://hostname3:3000,scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us
+
+After split: (This is internal Oozie mechanism)
+
+hcat://hostname1:1000,hcat://hostname2:2000/mydb/clicks/datastamp=12;region=us
+
+scheme://hostname3:3000,scheme://hostname4:4000,scheme://hostname5:5000/db/table/p1=12;p2=us
+
 #HCatalogLibraries
 ---+++ HCatalog Libraries
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4ccc9e5..845912c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2701 Oozie to support Multiple HCatalog URIs (abhishekbafna)
 OOZIE-2850 Fix default callback notifications (asasvari via gezapeti)
 OOZIE-1283 Remove the old ssh documentation (Jan Hentschel via rkanter)
 OOZIE-2845 Replace reflection-based code which sets variable in HiveConf (pbacsko via abhishekbafna)

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
index faeff2a..88ad762 100644
--- a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
+++ b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURI.java
@@ -55,6 +55,7 @@ public class HCatURI {
 
     private void parse(URI uri) throws URISyntaxException {
 
+        uri = HCatURIParser.parseURI(uri);
         this.uri = uri;
 
         if (uri.getAuthority() == null) {
@@ -107,6 +108,26 @@ public class HCatURI {
     }
 
     /**
+     * Return server end points with given scheme
+     * @param scheme uri scheme
+     * @return server end point with given scheme
+     */
+    public String getServerEndPointWithScheme(String scheme) {
+        String authority = uri.getAuthority();
+        String[] authorities = authority.split(",");
+        StringBuilder builder = new StringBuilder();
+        for (String auth : authorities) {
+            if (builder.length() != 0) {
+                builder.append(",");
+            }
+            builder.append(scheme);
+            builder.append("://");
+            builder.append(auth);
+        }
+        return builder.toString();
+    }
+
+    /**
      * @return fully qualified server address
      */
     public String getServerEndPoint() {

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java
new file mode 100644
index 0000000..94fc93a
--- /dev/null
+++ b/sharelib/hcatalog/src/main/java/org/apache/oozie/util/HCatURIParser.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class HCatURIParser {
+
+    public static String[] splitHCatUris(String uri, Pattern pattern) {
+        List<String> list = new ArrayList<>();
+        Matcher matcher = pattern.matcher(uri);
+        while (matcher.find()) {
+            String s = matcher.group();
+            list.add(s);
+        }
+        return list.toArray(new String[list.size()]);
+    }
+
+    static URI parseURI(URI uri) throws URISyntaxException {
+        String uriStr = uri.toString();
+        int index = uriStr.indexOf("://");
+        String scheme = uriStr.substring(0, index + 3);
+        uriStr = uriStr.replaceAll(scheme, "");
+        uriStr = scheme.concat(uriStr);
+        return new URI(uriStr);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/89be33bb/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java b/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java
index ce690da..eceda81 100644
--- a/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java
+++ b/sharelib/hcatalog/src/test/java/org/apache/oozie/util/TestHCatURI.java
@@ -24,26 +24,29 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.junit.Test;
-import org.apache.oozie.util.HCatURI;
 
 public class TestHCatURI {
 
     @Test
-    public void testHCatURIParseValidURI() {
+    public void testHCatURIParseValidURI() throws URISyntaxException {
         String input = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12;region=us";
-        HCatURI uri = null;
-        try {
-            uri = new HCatURI(input);
-        }
-        catch (Exception ex) {
-            System.err.print(ex.getMessage());
-        }
+        HCatURI uri =  new HCatURI(input);
         assertEquals(uri.getServerEndPoint(), "hcat://hcat.server.com:5080");
         assertEquals(uri.getDb(), "mydb");
         assertEquals(uri.getTable(), "clicks");
         assertEquals(uri.getPartitionValue("datastamp"), "12");
         assertEquals(uri.getPartitionValue("region"), "us");
     }
+    @Test
+    public void whenMultipleHCatURIsAreParsedASingleURIIsExtracted() throws URISyntaxException {
+        String input = "hcat://hcat.server.com:5080,hcat://hcat.server1.com:5080/mydb/clicks/datastamp=12;region=us";
+        HCatURI uri = new HCatURI(input);
+        assertEquals(uri.getServerEndPointWithScheme("hcat"), "hcat://hcat.server.com:5080,hcat://hcat.server1.com:5080");
+        assertEquals(uri.getDb(), "mydb");
+        assertEquals(uri.getTable(), "clicks");
+        assertEquals(uri.getPartitionValue("datastamp"), "12");
+        assertEquals(uri.getPartitionValue("region"), "us");
+    }
 
     @Test
     public void testHCatTableURI() {