You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/05/06 11:39:49 UTC
[kylin] branch 2.6.x updated: KYLIN-4385 HiveSink support object
storage
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 1258843 KYLIN-4385 HiveSink support object storage
1258843 is described below
commit 125884347dad6859782f156685b6784e788e2a04
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Wed Apr 8 19:07:30 2020 +0800
KYLIN-4385 HiveSink support object storage
---
.../kylin/metrics/lib/impl/hive/HiveProducer.java | 110 ++++++++++++++++-----
server/src/main/resources/kylinMetrics.xml | 7 ++
2 files changed, 93 insertions(+), 24 deletions(-)
diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 8062b12..72121a9 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.metrics.lib.impl.hive;
@@ -59,9 +59,16 @@ public class HiveProducer {
private static final int CACHE_MAX_SIZE = 10;
private final HiveConf hiveConf;
- private final FileSystem hdfs;
+ private final FileSystem fileSystem;
private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache;
private final String CONTENT_FILE_NAME;
+ private FSDataOutputStream fout;
+ private long partId = 0;
+
+ /**
+ * Some cloud file system, like AWS S3, didn't support append action to exist file.
+ */
+ private final boolean supportAppend;
public HiveProducer(Properties props) throws Exception {
this(props, new HiveConf());
@@ -73,7 +80,7 @@ public class HiveProducer {
hiveConf.set(e.getKey().toString(), e.getValue().toString());
}
- hdfs = FileSystem.get(hiveConf);
+ fileSystem = FileSystem.get(hiveConf);
tableFieldSchemaCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Pair<String, String>, Pair<String, List<FieldSchema>>>() {
@Override
@@ -85,6 +92,7 @@ public class HiveProducer {
public Pair<String, List<FieldSchema>> load(Pair<String, String> tableName) throws Exception {
HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf);
String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond()).getSd().getLocation();
+ logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond());
metaStoreClient.close();
return new Pair(tableLocation, fields);
@@ -97,7 +105,10 @@ public class HiveProducer {
} catch (UnknownHostException e) {
hostName = "UNKNOWN";
}
- CONTENT_FILE_NAME = hostName + "-part-0000";
+ CONTENT_FILE_NAME = hostName + "-part-";
+ String fsUri = fileSystem.getUri().toString();
+ supportAppend = fsUri.startsWith("hdfs"); // Only HDFS support append
+ logger.info("For {}, supportAppend was set to {}.", fsUri, supportAppend);
}
public void close() {
@@ -114,7 +125,7 @@ public class HiveProducer {
for (Record record : recordList) {
HiveProducerRecord hiveRecord = convertTo(record);
if (recordMap.get(hiveRecord.key()) == null) {
- recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+ recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
}
recordMap.get(hiveRecord.key()).add(hiveRecord);
}
@@ -125,6 +136,7 @@ public class HiveProducer {
}
private void write(RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception {
+ // Step 1: determine partitionPath by record 's RecordKey
String tableLocation = tableFieldSchemaCache.get(new Pair<>(recordKey.database(), recordKey.table())).getFirst();
StringBuilder sb = new StringBuilder();
sb.append(tableLocation);
@@ -135,7 +147,9 @@ public class HiveProducer {
sb.append(e.getValue());
}
Path partitionPath = new Path(sb.toString());
- if (!hdfs.exists(partitionPath)) {
+
+ // Step 2: create partition path for hive table if not exists
+ if (!fileSystem.exists(partitionPath)) {
StringBuilder hql = new StringBuilder();
hql.append("ALTER TABLE ");
hql.append(recordKey.database() + "." + recordKey.table());
@@ -151,38 +165,75 @@ public class HiveProducer {
hql.append("='" + e.getValue() + "'");
}
hql.append(")");
- Driver driver = new Driver(hiveConf);
- CliSessionState session = new CliSessionState(hiveConf);
- SessionState.start(session);
- CommandProcessorResponse res = driver.run(hql.toString());
- if (res.getResponseCode() != 0) {
- logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
- hql.toString(),
- res.toString());
+ logger.debug("Create partition by {}.", hql);
+ try {
+ Driver driver = new Driver(hiveConf);
+ CliSessionState session = new CliSessionState(hiveConf);
+ SessionState.start(session);
+ CommandProcessorResponse res = driver.run(hql.toString());
+ if (res.getResponseCode() != 0) {
+ logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+ hql.toString(),
+ res.toString());
+ }
+ session.close();
+ driver.close();
+ } catch (Exception e) {
+ logger.error("Create hive partition failed.", e);
}
- session.close();
- driver.close();
}
- Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME);
- if (!hdfs.exists(partitionContentPath)) {
+
+ // Step 3: create file if it is the first time write metrics message or new partition should be used
+ Path partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME + String.format(Locale.ROOT, "%05d", partId));
+
+ // Do not overwrite exist files when supportAppend was set to false
+ int nCheck = 0;
+ while (!supportAppend && fileSystem.exists(partitionContentPath)) {
+ partId++;
+ nCheck++;
+ partitionContentPath = new Path(partitionPath, CONTENT_FILE_NAME + String.format(Locale.ROOT, "%05d", partId));
+ if (nCheck > 100000) {
+ logger.warn("Exceed max check times.");
+ break;
+ }
+ }
+
+ if (!fileSystem.exists(partitionContentPath)) {
int nRetry = 0;
- while (!hdfs.createNewFile(partitionContentPath) && nRetry++ < 5) {
- if (hdfs.exists(partitionContentPath)) {
+ while (!fileSystem.createNewFile(partitionContentPath) && nRetry++ < 5) {
+ if (fileSystem.exists(partitionContentPath)) {
break;
}
Thread.sleep(500L * nRetry);
}
- if (!hdfs.exists(partitionContentPath)) {
+ if (!fileSystem.exists(partitionContentPath)) {
throw new RuntimeException("Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
}
+ partId = (partId + 1) % 100000;
}
- try (FSDataOutputStream fout = hdfs.append(partitionContentPath)) {
+
+ if (supportAppend) {
+ fout = fileSystem.append(partitionContentPath);
+ } else {
+ fout = fileSystem.create(partitionContentPath);
+ }
+
+ // Step 4: Write records to Distributed FileSystem
+ try {
+ int count = 0;
for (HiveProducerRecord elem : recordItr) {
fout.writeBytes(elem.valueToString() + "\n");
+ count++;
}
+ logger.info("Success to write {} metrics {} to file {}.", count, recordKey.table(), partitionContentPath);
} catch (IOException e) {
- System.out.println("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
- logger.error("Fails to write metrics to file " + partitionContentPath.toString() + " due to " + e);
+ logger.error("Fails to write metrics(" + ") to file " + partitionContentPath.toString()
+ + " due to ", e);
+ closeFout();
+ }
+
+ if (!supportAppend) {
+ closeFout();
}
}
@@ -207,4 +258,15 @@ public class HiveProducer {
return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, columnValues);
}
+ private void closeFout() {
+ if (fout != null) {
+ try {
+ logger.info("Flush output stream.");
+ fout.close();
+ } catch (Exception e) {
+ logger.error("Close the path failed", e);
+ }
+ }
+ fout = null;
+ }
}
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index 354b1a0..0e81262 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -19,15 +19,22 @@
<description>Kylin Metrics Related Configuration</description>
+ <!-- A Reservoir which don't staged/cached metrics message at all, emit it in no time. Maybe good for debug purpose or kafka sink.-->
<bean id="instantReservoir" class="org.apache.kylin.metrics.lib.impl.InstantReservoir"/>
+ <!-- A Reservoir which staged metrics message in memory, and emit them in fixed rate. -->
<bean id="blockingReservoir" class="org.apache.kylin.metrics.lib.impl.BlockingReservoir">
+ <!-- minReportSize, only if currently count of staged message exceed minReportSize, will Reservoir try to write message-->
<constructor-arg index="0">
<value>10</value>
</constructor-arg>
+
+ <!-- maxReportSize, max size of report in one time -->
<constructor-arg index="1">
<value>10</value>
</constructor-arg>
+
+ <!-- minReportTime, min duration(in minute) between two report action-->
<constructor-arg index="2">
<value>10</value>
</constructor-arg>