You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/17 15:17:50 UTC
[kylin] 04/07: KYLIN-4573 Add option to indicate whether to close
file for every append for Hive Producer
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3400338d7870c843e5ab2490c6767aa37e8092e4
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Mon Jun 15 16:41:36 2020 +0800
KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer
(cherry picked from commit 616e06675278a6857f3cbb353a4f9c2243eeccc1)
---
.../kylin/metrics/lib/impl/hive/HiveProducer.java | 93 ++++++++++++++++++----
server/src/main/resources/kylinMetrics.xml | 1 +
2 files changed, 77 insertions(+), 17 deletions(-)
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index a96b261..8bc7a43 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -66,6 +67,15 @@ public class HiveProducer {
private Path curPartitionContentPath;
private int id = 0;
private FSDataOutputStream fout;
+ /**
+ * Some cloud file system, like AWS S3, didn't support append action to exist file.
+ * When append is not supported, will produce new file in a call to write method.
+ */
+ private final boolean supportAppend;
+
+ private final boolean closeFileEveryAppend;
+
+ private final Map<String, String> kylinSpecifiedConfig = new HashMap<>();
public HiveProducer(String metricType, Properties props) throws Exception {
this(metricType, props, new HiveConf());
@@ -75,7 +85,13 @@ public class HiveProducer {
this.metricType = metricType;
hiveConf = hiveConfig;
for (Map.Entry<Object, Object> e : props.entrySet()) {
- hiveConf.set(e.getKey().toString(), e.getValue().toString());
+ String key = e.getKey().toString();
+ String value = e.getValue().toString();
+ if (key.startsWith("kylin.")) {
+ kylinSpecifiedConfig.put(key, value);
+ } else {
+ hiveConf.set(key, value);
+ }
}
fs = FileSystem.get(hiveConf);
@@ -96,6 +112,7 @@ public class HiveProducer {
IMetaStoreClient metaStoreClient = HiveMetaStoreClientFactory.getHiveMetaStoreClient(hiveConf);
String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond())
.getSd().getLocation();
+ logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(),
tableName.getSecond());
metaStoreClient.close();
@@ -110,6 +127,12 @@ public class HiveProducer {
hostName = "UNKNOWN";
}
contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-";
+ String fsUri = fs.getUri().toString();
+ supportAppend = fsUri.startsWith("hdfs") ; // Only HDFS is appendable
+ logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend);
+
+ closeFileEveryAppend = !supportAppend
+ || Boolean.parseBoolean(kylinSpecifiedConfig.get("kylin.hive.producer.close-file-every-append"));
}
public void close() {
@@ -127,7 +150,7 @@ public class HiveProducer {
for (Record record : recordList) {
HiveProducerRecord hiveRecord = convertTo(record);
if (recordMap.get(hiveRecord.key()) == null) {
- recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+ recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
}
recordMap.get(hiveRecord.key()).add(hiveRecord);
}
@@ -175,17 +198,31 @@ public class HiveProducer {
}
hql.append(")");
logger.debug("create partition by {}.", hql);
- Driver driver = new Driver(hiveConf);
- CliSessionState session = new CliSessionState(hiveConf);
- SessionState.start(session);
- CommandProcessorResponse res = driver.run(hql.toString());
- if (res.getResponseCode() != 0) {
- logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
- hql.toString(),
- res.toString());
+ Driver driver = null;
+ CliSessionState session = null;
+ try {
+ driver = new Driver(hiveConf);
+ session = new CliSessionState(hiveConf);
+ SessionState.start(session);
+ CommandProcessorResponse res = driver.run(hql.toString());
+ if (res.getResponseCode() != 0) {
+ logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+ hql.toString(),
+ res.toString());
+ }
+ session.close();
+ driver.close();
+ } catch (Exception ex) {
+ // Do not let hive exception stop HiveProducer from writing file, so catch and report it here
+ logger.error("create partition failed, please create it manually : " + hql, ex);
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ if (driver != null) {
+ driver.close();
+ }
}
- session.close();
- driver.close();
}
// Step 3: create path for new partition if it is the first time write metrics message or new partition should be used
@@ -195,7 +232,21 @@ public class HiveProducer {
closeFout();
}
- Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id));
+ Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+
+ // Do not overwrite exist files when supportAppend was set to false
+ int nCheck = 0;
+ while (!supportAppend && fs.exists(partitionContentPath)) {
+ id++;
+ nCheck++;
+ partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+ logger.debug("{} exists, skip it.", partitionContentPath);
+ if (nCheck > 100000) {
+ logger.warn("Exceed max check times.");
+ break;
+ }
+ }
+
logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType);
if (!fs.exists(partitionContentPath)) {
int nRetry = 0;
@@ -210,30 +261,38 @@ public class HiveProducer {
"Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
}
}
- fout = fs.append(partitionContentPath);
+ if (supportAppend) {
+ fout = fs.append(partitionContentPath);
+ } else {
+ fout = fs.create(partitionContentPath);
+ }
prePartitionPath = partitionPath.toString();
curPartitionContentPath = partitionContentPath;
- id = (id + 1) % 10;
+ id = (id + 1) % (supportAppend ? 10 : 100000);
}
- // Step 4: append record to HDFS without flush
+ // Step 4: append record to DFS
try {
int count = 0;
for (HiveProducerRecord elem : recordItr) {
fout.writeBytes(elem.valueToString() + "\n");
count++;
}
- logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
+ logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
} catch (IOException e) {
logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString()
+ " due to ", e);
closeFout();
}
+ if (closeFileEveryAppend) {
+ closeFout();
+ }
}
private void closeFout() {
if (fout != null) {
try {
+ logger.debug("Flush output stream {}.", curPartitionContentPath);
fout.close();
} catch (Exception e) {
logger.error("Close the path: " + curPartitionContentPath + " failed", e);
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index 843fb91..85c879f 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -73,6 +73,7 @@
value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
<property name="second">
<props>
+ <prop key="kylin.hive.producer.close-file-every-append">true</prop>
</props>
</property>
</bean>