You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 11:54:23 UTC
[kylin] 01/02: KYLIN-4385 Fix HiveProducer can not write to Hive
Table[AWS Azure]
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 3.0.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 619e05e8001f6fa0c0927e16eab650ff87787ef9
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Apr 1 18:32:14 2020 +0800
KYLIN-4385 Fix HiveProducer can not write to Hive Table[AWS Azure]
---
.../kylin/metrics/lib/impl/hive/HiveProducer.java | 77 +++++++++++++++++-----
1 file changed, 61 insertions(+), 16 deletions(-)
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 5082b6a..ae10a93 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -66,6 +66,11 @@ public class HiveProducer {
private Path curPartitionContentPath;
private int id = 0;
private FSDataOutputStream fout;
+ /**
+ * Some cloud file system, like AWS S3, didn't support append action to exist file.
+ * When append is not supported, will produce new file in a call to write method.
+ */
+ private final boolean supportAppend;
public HiveProducer(String metricType, Properties props) throws Exception {
this(metricType, props, new HiveConf());
@@ -96,6 +101,7 @@ public class HiveProducer {
HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);
String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond())
.getSd().getLocation();
+ logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(),
tableName.getSecond());
metaStoreClient.close();
@@ -110,6 +116,9 @@ public class HiveProducer {
hostName = "UNKNOWN";
}
contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-";
+ String fsUri = fs.getUri().toString();
+ supportAppend = !fsUri.startsWith("s3") && !fsUri.startsWith("wasb"); // AWS EMR and Azure HDInsight
+ logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend);
}
public void close() {
@@ -126,7 +135,7 @@ public class HiveProducer {
for (Record record : recordList) {
HiveProducerRecord hiveRecord = convertTo(record);
if (recordMap.get(hiveRecord.key()) == null) {
- recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+ recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
}
recordMap.get(hiveRecord.key()).add(hiveRecord);
}
@@ -174,17 +183,31 @@ public class HiveProducer {
}
hql.append(")");
logger.debug("create partition by {}.", hql);
- Driver driver = new Driver(hiveConf);
- CliSessionState session = new CliSessionState(hiveConf);
- SessionState.start(session);
- CommandProcessorResponse res = driver.run(hql.toString());
- if (res.getResponseCode() != 0) {
- logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
- hql.toString(),
- res.toString());
+ Driver driver = null;
+ CliSessionState session = null;
+ try {
+ driver = new Driver(hiveConf);
+ session = new CliSessionState(hiveConf);
+ SessionState.start(session);
+ CommandProcessorResponse res = driver.run(hql.toString());
+ if (res.getResponseCode() != 0) {
+ logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+ hql.toString(),
+ res.toString());
+ }
+ session.close();
+ driver.close();
+ } catch (Exception ex) {
+ // Do not let hive exception stop HiveProducer from writing file, so catch and report it here
+ logger.error("create partition failed, please create it manually : " + hql, ex);
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ if (driver != null) {
+ driver.close();
+ }
}
- session.close();
- driver.close();
}
// Step 3: create path for new partition if it is the first time write metrics message or new partition should be used
@@ -194,7 +217,21 @@ public class HiveProducer {
closeFout();
}
- Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id));
+ Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+
+ // Do not overwrite exist files when supportAppend was set to false
+ int nCheck = 0;
+ while (!supportAppend && fs.exists(partitionContentPath)) {
+ id++;
+ nCheck++;
+ partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+ logger.debug("{} exists, skip it.", partitionContentPath);
+ if (nCheck > 100000) {
+ logger.warn("Exceed max check times.");
+ break;
+ }
+ }
+
logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType);
if (!fs.exists(partitionContentPath)) {
int nRetry = 0;
@@ -209,30 +246,38 @@ public class HiveProducer {
"Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
}
}
- fout = fs.append(partitionContentPath);
+ if (supportAppend) {
+ fout = fs.append(partitionContentPath);
+ } else {
+ fout = fs.create(partitionContentPath);
+ }
prePartitionPath = partitionPath.toString();
curPartitionContentPath = partitionContentPath;
- id = (id + 1) % 10;
+ id = (id + 1) % (supportAppend ? 10 : 100000);
}
- // Step 4: append record to HDFS without flush
+ // Step 4: append record to DFS
try {
int count = 0;
for (HiveProducerRecord elem : recordItr) {
fout.writeBytes(elem.valueToString() + "\n");
count++;
}
- logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
+ logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
} catch (IOException e) {
logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString()
+ " due to ", e);
closeFout();
}
+ if (!supportAppend) {
+ closeFout();
+ }
}
private void closeFout() {
if (fout != null) {
try {
+ logger.debug("Flush output stream {}.", curPartitionContentPath);
fout.close();
} catch (Exception e) {
logger.error("Close the path: " + curPartitionContentPath + " failed", e);