You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/02/24 23:47:07 UTC
hive git commit: HIVE-15951 : Make sure base persist directory is
unique and deleted (Slim Bouguerra via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 7fd5ba53a -> 2f6f6bda2
HIVE-15951 : Make sure base persist directory is unique and deleted (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f6f6bda
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f6f6bda
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f6f6bda
Branch: refs/heads/master
Commit: 2f6f6bda2fa2434bfcfc7cdb2d3059045568ab1a
Parents: 7fd5ba5
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Thu Feb 16 14:27:00 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Feb 24 15:45:56 2017 -0800
----------------------------------------------------------------------
.../hadoop/hive/druid/io/DruidRecordWriter.java | 25 ++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2f6f6bda/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 3323cc0..e2c5b9d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -42,6 +42,7 @@ import io.druid.segment.realtime.plumber.Committers;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.Constants;
@@ -57,9 +58,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>,
@@ -90,16 +93,19 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab
final Path segmentsDescriptorsDir,
final FileSystem fileSystem
) {
+ File basePersistDir = new File(realtimeTuningConfig.getBasePersistDirectory(),
+ UUID.randomUUID().toString()
+ );
this.tuningConfig = Preconditions
- .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null");
+ .checkNotNull(realtimeTuningConfig.withBasePersistDirectory(basePersistDir),
+ "realtimeTuningConfig is null"
+ );
this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
+
appenderator = Appenderators
- .createOffline(this.dataSchema,
- tuningConfig,
- new FireDepartmentMetrics(), dataSegmentPusher,
- DruidStorageHandlerUtils.JSON_MAPPER,
- DruidStorageHandlerUtils.INDEX_IO,
- DruidStorageHandlerUtils.INDEX_MERGER_V9
+ .createOffline(this.dataSchema, tuningConfig, new FireDepartmentMetrics(),
+ dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER,
+ DruidStorageHandlerUtils.INDEX_IO, DruidStorageHandlerUtils.INDEX_MERGER_V9
);
Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0");
this.maxPartitionSize = maxPartitionSize;
@@ -260,6 +266,11 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab
} catch (InterruptedException e) {
Throwables.propagate(e);
} finally {
+ try {
+ FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
+ } catch (Exception e){
+ LOG.error("error cleaning of base persist directory", e);
+ }
appenderator.close();
}
}