You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/17 15:17:50 UTC

[kylin] 04/07: KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3400338d7870c843e5ab2490c6767aa37e8092e4
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Mon Jun 15 16:41:36 2020 +0800

    KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer
    
    (cherry picked from commit 616e06675278a6857f3cbb353a4f9c2243eeccc1)
---
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  | 93 ++++++++++++++++++----
 server/src/main/resources/kylinMetrics.xml         |  1 +
 2 files changed, 77 insertions(+), 17 deletions(-)

diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index a96b261..8bc7a43 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -66,6 +67,15 @@ public class HiveProducer {
     private Path curPartitionContentPath;
     private int id = 0;
     private FSDataOutputStream fout;
+    /**
+     * Some cloud file system, like AWS S3, didn't support append action to exist file.
+     * When append is not supported, will produce new file in a call to write method.
+     */
+    private final boolean supportAppend;
+
+    private final boolean closeFileEveryAppend;
+
+    private final Map<String, String> kylinSpecifiedConfig = new HashMap<>();
 
     public HiveProducer(String metricType, Properties props) throws Exception {
         this(metricType, props, new HiveConf());
@@ -75,7 +85,13 @@ public class HiveProducer {
         this.metricType = metricType;
         hiveConf = hiveConfig;
         for (Map.Entry<Object, Object> e : props.entrySet()) {
-            hiveConf.set(e.getKey().toString(), e.getValue().toString());
+            String key = e.getKey().toString();
+            String value = e.getValue().toString();
+            if (key.startsWith("kylin.")) {
+                kylinSpecifiedConfig.put(key, value);
+            } else {
+                hiveConf.set(key, value);
+            }
         }
 
         fs = FileSystem.get(hiveConf);
@@ -96,6 +112,7 @@ public class HiveProducer {
                         IMetaStoreClient metaStoreClient = HiveMetaStoreClientFactory.getHiveMetaStoreClient(hiveConf);
                         String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond())
                                 .getSd().getLocation();
+                        logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
                         List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(),
                                 tableName.getSecond());
                         metaStoreClient.close();
@@ -110,6 +127,12 @@ public class HiveProducer {
             hostName = "UNKNOWN";
         }
         contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-";
+        String fsUri = fs.getUri().toString();
+        supportAppend = fsUri.startsWith("hdfs") ; // Only HDFS is appendable
+        logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend);
+
+        closeFileEveryAppend = !supportAppend
+                || Boolean.parseBoolean(kylinSpecifiedConfig.get("kylin.hive.producer.close-file-every-append"));
     }
 
     public void close() {
@@ -127,7 +150,7 @@ public class HiveProducer {
         for (Record record : recordList) {
             HiveProducerRecord hiveRecord = convertTo(record);
             if (recordMap.get(hiveRecord.key()) == null) {
-                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
             }
             recordMap.get(hiveRecord.key()).add(hiveRecord);
         }
@@ -175,17 +198,31 @@ public class HiveProducer {
             }
             hql.append(")");
             logger.debug("create partition by {}.", hql);
-            Driver driver = new Driver(hiveConf);
-            CliSessionState session = new CliSessionState(hiveConf);
-            SessionState.start(session);
-            CommandProcessorResponse res = driver.run(hql.toString());
-            if (res.getResponseCode() != 0) {
-                logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
-                        hql.toString(),
-                        res.toString());
+            Driver driver = null;
+            CliSessionState session = null;
+            try {
+                driver = new Driver(hiveConf);
+                session = new CliSessionState(hiveConf);
+                SessionState.start(session);
+                CommandProcessorResponse res = driver.run(hql.toString());
+                if (res.getResponseCode() != 0) {
+                    logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+                            hql.toString(),
+                            res.toString());
+                }
+                session.close();
+                driver.close();
+            } catch (Exception ex) {
+                // Do not let hive exception stop HiveProducer from writing file, so catch and report it here
+                logger.error("create partition failed, please create it manually : " + hql, ex);
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+                if (driver != null) {
+                    driver.close();
+                }
             }
-            session.close();
-            driver.close();
         }
 
         // Step 3: create path for new partition if it is the first time write metrics message or new partition should be used
@@ -195,7 +232,21 @@ public class HiveProducer {
                 closeFout();
             }
 
-            Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id));
+            Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+
+            // Do not overwrite exist files when supportAppend was set to false
+            int nCheck = 0;
+            while (!supportAppend && fs.exists(partitionContentPath)) {
+                id++;
+                nCheck++;
+                partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+                logger.debug("{} exists, skip it.", partitionContentPath);
+                if (nCheck > 100000) {
+                    logger.warn("Exceed max check times.");
+                    break;
+                }
+            }
+
             logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType);
             if (!fs.exists(partitionContentPath)) {
                 int nRetry = 0;
@@ -210,30 +261,38 @@ public class HiveProducer {
                             "Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
                 }
             }
-            fout = fs.append(partitionContentPath);
+            if (supportAppend) {
+                fout = fs.append(partitionContentPath);
+            } else {
+                fout = fs.create(partitionContentPath);
+            }
             prePartitionPath = partitionPath.toString();
             curPartitionContentPath = partitionContentPath;
-            id = (id + 1) % 10;
+            id = (id + 1) % (supportAppend ? 10 : 100000);
         }
 
-        // Step 4: append record to HDFS without flush
+        // Step 4: append record to DFS
         try {
             int count = 0;
             for (HiveProducerRecord elem : recordItr) {
                 fout.writeBytes(elem.valueToString() + "\n");
                 count++;
             }
-            logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
+            logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
         } catch (IOException e) {
             logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString()
                     + " due to ", e);
             closeFout();
         }
+        if (closeFileEveryAppend) {
+            closeFout();
+        }
     }
 
     private void closeFout() {
         if (fout != null) {
             try {
+                logger.debug("Flush output stream {}.", curPartitionContentPath);
                 fout.close();
             } catch (Exception e) {
                 logger.error("Close the path: " + curPartitionContentPath + " failed", e);
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index 843fb91..85c879f 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -73,6 +73,7 @@
                                           value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
                                 <property name="second">
                                     <props>
+                                        <prop key="kylin.hive.producer.close-file-every-append">true</prop>
                                     </props>
                                 </property>
                             </bean>