You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 11:39:49 UTC

[kylin] branch 2.6.x updated: KYLIN-4385 HiveSink support object storage

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 1258843  KYLIN-4385 HiveSink support object storage
1258843 is described below

commit 125884347dad6859782f156685b6784e788e2a04
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Apr 8 19:07:30 2020 +0800

    KYLIN-4385 HiveSink support object storage
---
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  | 110 ++++++++++++++++-----
 server/src/main/resources/kylinMetrics.xml         |   7 ++
 2 files changed, 93 insertions(+), 24 deletions(-)

diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 8062b12..72121a9 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.metrics.lib.impl.hive;
 
@@ -59,9 +59,16 @@ public class HiveProducer {
     private static final int CACHE_MAX_SIZE = 10;
 
     private final HiveConf hiveConf;
-    private final FileSystem hdfs;
+    private final FileSystem fileSystem;
     private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache;
     private final String CONTENT_FILE_NAME;
+    private FSDataOutputStream fout;
+    private long partId = 0;
+
+    /**
+     * Some cloud file system, like AWS S3, didn't support append action to exist file.
+     */
+    private final boolean supportAppend;
 
     public HiveProducer(Properties props) throws Exception {
         this(props, new HiveConf());
@@ -73,7 +80,7 @@ public class HiveProducer {
             hiveConf.set(e.getKey().toString(), e.getValue().toString());
         }
 
-        hdfs = FileSystem.get(hiveConf);
+        fileSystem = FileSystem.get(hiveConf);
 
         tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
             @Override
@@ -85,6 +92,7 @@ public class HiveProducer {
             public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName) throws Exception {
                 HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);
                 String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()).getSd().getLocation();
+                logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
                 List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond());
                 metaStoreClient.close();
                 return new Pair(tableLocation, fields);
@@ -97,7 +105,10 @@ public class HiveProducer {
         } catch (UnknownHostException e) {
             hostName = "UNKNOWN";
         }
-        CONTENT_FILE_NAME = hostName + "-part-0000";
+        CONTENT_FILE_NAME = hostName + "-part-";
+        String fsUri = fileSystem.getUri().toString();
+        supportAppend = fsUri.startsWith("hdfs"); // Only HDFS support append
+        logger.info("For {}, supportAppend was set to {}.", fsUri, supportAppend);
     }
 
     public void close() {
@@ -114,7 +125,7 @@ public class HiveProducer {
         for (Record record : recordList) {
             HiveProducerRecord hiveRecord = convertTo(record);
             if (recordMap.get(hiveRecord.key()) == null) {
-                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
             }
             recordMap.get(hiveRecord.key()).add(hiveRecord);
         }
@@ -125,6 +136,7 @@ public class HiveProducer {
     }
 
     private void write(RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception {
+        // Step 1: determine partitionPath by record 's RecordKey
         String tableLocation = tableFieldSchemaCache.get(new Pair<>(recordKey.database(), recordKey.table())).getFirst();
         StringBuilder sb = new StringBuilder();
         sb.append(tableLocation);
@@ -135,7 +147,9 @@ public class HiveProducer {
             sb.append(e.getValue());
         }
         Path partitionPath = new Path(sb.toString());
-        if (!hdfs.exists(partitionPath)) {
+
+        // Step 2: create partition path for hive table if not exists
+        if (!fileSystem.exists(partitionPath)) {
             StringBuilder hql = new StringBuilder();
             hql.append("ALTER TABLE ");
             hql.append(recordKey.database() + "." + recordKey.table());
@@ -151,38 +165,75 @@ public class HiveProducer {
                 hql.append("='" + e.getValue() + "'");
             }
             hql.append(")");
-            Driver driver = new Driver(hiveConf);
-            CliSessionState session = new CliSessionState(hiveConf);
-            SessionState.start(session);
-            CommandProcessorResponse res = driver.run(hql.toString());
-            if (res.getResponseCode() != 0) {
-                logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
-                        hql.toString(),
-                        res.toString());
+            logger.debug("Create partition by {}.", hql);
+            try {
+                Driver driver = new Driver(hiveConf);
+                CliSessionState session = new CliSessionState(hiveConf);
+                SessionState.start(session);
+                CommandProcessorResponse res = driver.run(hql.toString());
+                if (res.getResponseCode() != 0) {
+                    logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+                            hql.toString(),
+                            res.toString());
+                }
+                session.close();
+                driver.close();
+            } catch (Exception e) {
+                logger.error("Create hive partition failed.", e);
             }
-            session.close();
-            driver.close();
         }
-        Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME);
-        if (!hdfs.exists(partitionContentPath)) {
+
+        // Step 3: create file if it is the first time write metrics message or new partition should be used
+        Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME + String.format(Locale.ROOT, "%05d", partId));
+
+        // Do not overwrite exist files when supportAppend was set to false
+        int nCheck = 0;
+        while (!supportAppend && fileSystem.exists(partitionContentPath)) {
+            partId++;
+            nCheck++;
+            partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME + String.format(Locale.ROOT, "%05d", partId));
+            if (nCheck > 100000) {
+                logger.warn("Exceed max check times.");
+                break;
+            }
+        }
+
+        if (!fileSystem.exists(partitionContentPath)) {
             int nRetry = 0;
-            while (!hdfs.createNewFile(partitionContentPath) && nRetry++ < 5) {
-                if (hdfs.exists(partitionContentPath)) {
+            while (!fileSystem.createNewFile(partitionContentPath) && nRetry++ < 5) {
+                if (fileSystem.exists(partitionContentPath)) {
                     break;
                 }
                 Thread.sleep(500L * nRetry);
             }
-            if (!hdfs.exists(partitionContentPath)) {
+            if (!fileSystem.exists(partitionContentPath)) {
                 throw new RuntimeException("Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
             }
+            partId = (partId + 1) % 100000;
         }
-        try (FSDataOutputStream fout = hdfs.append(partitionContentPath)) {
+
+        if (supportAppend) {
+            fout = fileSystem.append(partitionContentPath);
+        } else {
+            fout = fileSystem.create(partitionContentPath);
+        }
+
+        // Step 4: Write records to Distributed FileSystem
+        try {
+            int count = 0;
             for (HiveProducerRecord elem : recordItr) {
                 fout.writeBytes(elem.valueToString() + "\n");
+                count++;
             }
+            logger.info("Success to write {} metrics {} to file {}.", count, recordKey.table(), partitionContentPath);
         } catch (IOException e) {
-            System.out.println("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
-            logger.error("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
+            logger.error("Fails to write metrics(" + ") to file " + partitionContentPath.toString()
+                    + " due to ", e);
+            closeFout();
+        }
+
+        if (!supportAppend) {
+            closeFout();
         }
     }
 
@@ -207,4 +258,15 @@ public class HiveProducer {
         return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, columnValues);
     }
 
+    private void closeFout() {
+        if (fout != null) {
+            try {
+                logger.info("Flush output stream.");
+                fout.close();
+            } catch (Exception e) {
+                logger.error("Close the path failed", e);
+            }
+        }
+        fout = null;
+    }
 }
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index 354b1a0..0e81262 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -19,15 +19,22 @@
 
     <description>Kylin Metrics Related Configuration</description>
 
+    <!-- A Reservoir which don't staged/cached metrics message at all, emit it in no time. Maybe good for debug purpose or kafka sink.-->
     <bean id="instantReservoir" class="org.apache.kylin.metrics.lib.impl.InstantReservoir"/>
 
+    <!-- A Reservoir which staged metrics message in memory, and emit them in fixed rate. -->
     <bean id="blockingReservoir" class="org.apache.kylin.metrics.lib.impl.BlockingReservoir">
+        <!-- minReportSize, only if currently count of staged message exceed minReportSize, will Reservoir try to write message-->
         <constructor-arg index="0">
             <value>10</value>
         </constructor-arg>
+
+        <!-- maxReportSize, max size of report in one time -->
         <constructor-arg index="1">
             <value>10</value>
         </constructor-arg>
+
+        <!-- minReportTime, min duration(in minute) between two report action-->
         <constructor-arg index="2">
             <value>10</value>
         </constructor-arg>