You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 11:54:23 UTC

[kylin] 01/02: KYLIN-4385 Fix HiveProducer can not write to Hive Table[AWS Azure]

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 3.0.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 619e05e8001f6fa0c0927e16eab650ff87787ef9
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Apr 1 18:32:14 2020 +0800

    KYLIN-4385 Fix HiveProducer can not write to Hive Table[AWS Azure]
---
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  | 77 +++++++++++++++++-----
 1 file changed, 61 insertions(+), 16 deletions(-)

diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 5082b6a..ae10a93 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -66,6 +66,11 @@ public class HiveProducer {
     private Path curPartitionContentPath;
     private int id = 0;
     private FSDataOutputStream fout;
+    /**
+     * Some cloud file system, like AWS S3, didn't support append action to exist file.
+     * When append is not supported, will produce new file in a call to write method.
+     */
+    private final boolean supportAppend;
 
     public HiveProducer(String metricType, Properties props) throws Exception {
         this(metricType, props, new HiveConf());
@@ -96,6 +101,7 @@ public class HiveProducer {
                         HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);
                         String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond())
                                 .getSd().getLocation();
+                        logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
                         List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(),
                                 tableName.getSecond());
                         metaStoreClient.close();
@@ -110,6 +116,9 @@ public class HiveProducer {
             hostName = "UNKNOWN";
         }
         contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-";
+        String fsUri = fs.getUri().toString();
+        supportAppend = !fsUri.startsWith("s3") && !fsUri.startsWith("wasb"); // AWS EMR and Azure HDInsight
+        logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend);
     }
 
     public void close() {
@@ -126,7 +135,7 @@ public class HiveProducer {
         for (Record record : recordList) {
             HiveProducerRecord hiveRecord = convertTo(record);
             if (recordMap.get(hiveRecord.key()) == null) {
-                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
             }
             recordMap.get(hiveRecord.key()).add(hiveRecord);
         }
@@ -174,17 +183,31 @@ public class HiveProducer {
             }
             hql.append(")");
             logger.debug("create partition by {}.", hql);
-            Driver driver = new Driver(hiveConf);
-            CliSessionState session = new CliSessionState(hiveConf);
-            SessionState.start(session);
-            CommandProcessorResponse res = driver.run(hql.toString());
-            if (res.getResponseCode() != 0) {
-                logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
-                        hql.toString(),
-                        res.toString());
+            Driver driver = null;
+            CliSessionState session = null;
+            try {
+                driver = new Driver(hiveConf);
+                session = new CliSessionState(hiveConf);
+                SessionState.start(session);
+                CommandProcessorResponse res = driver.run(hql.toString());
+                if (res.getResponseCode() != 0) {
+                    logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+                            hql.toString(),
+                            res.toString());
+                }
+                session.close();
+                driver.close();
+            } catch (Exception ex) {
+                // Do not let hive exception stop HiveProducer from writing file, so catch and report it here
+                logger.error("create partition failed, please create it manually : " + hql, ex);
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+                if (driver != null) {
+                    driver.close();
+                }
             }
-            session.close();
-            driver.close();
         }
 
         // Step 3: create path for new partition if it is the first time write metrics message or new partition should be used
@@ -194,7 +217,21 @@ public class HiveProducer {
                 closeFout();
             }
 
-            Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id));
+            Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+
+            // Do not overwrite exist files when supportAppend was set to false
+            int nCheck = 0;
+            while (!supportAppend && fs.exists(partitionContentPath)) {
+                id++;
+                nCheck++;
+                partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+                logger.debug("{} exists, skip it.", partitionContentPath);
+                if (nCheck > 100000) {
+                    logger.warn("Exceed max check times.");
+                    break;
+                }
+            }
+
             logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType);
             if (!fs.exists(partitionContentPath)) {
                 int nRetry = 0;
@@ -209,30 +246,38 @@ public class HiveProducer {
                             "Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
                 }
             }
-            fout = fs.append(partitionContentPath);
+            if (supportAppend) {
+                fout = fs.append(partitionContentPath);
+            } else {
+                fout = fs.create(partitionContentPath);
+            }
             prePartitionPath = partitionPath.toString();
             curPartitionContentPath = partitionContentPath;
-            id = (id + 1) % 10;
+            id = (id + 1) % (supportAppend ? 10 : 100000);
         }
 
-        // Step 4: append record to HDFS without flush
+        // Step 4: append record to DFS
         try {
             int count = 0;
             for (HiveProducerRecord elem : recordItr) {
                 fout.writeBytes(elem.valueToString() + "\n");
                 count++;
             }
-            logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
+            logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
         } catch (IOException e) {
             logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString()
                     + " due to ", e);
             closeFout();
         }
+        if (!supportAppend) {
+            closeFout();
+        }
     }
 
     private void closeFout() {
         if (fout != null) {
             try {
+                logger.debug("Flush output stream {}.", curPartitionContentPath);
                 fout.close();
             } catch (Exception e) {
                 logger.error("Close the path: " + curPartitionContentPath + " failed", e);