You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/12/15 05:16:37 UTC
(phoenix) branch master updated: PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 0ed534f4cf PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location
0ed534f4cf is described below
commit 0ed534f4cfefb059f5c8633f0db9c4a188ba97df
Author: Sergey Soldatov <ss...@.apache.org>
AuthorDate: Tue May 31 13:37:20 2022 -0700
PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location
Co-authored-by: Istvan Toth <st...@apache.org>
---
.../phoenix/mapreduce/MultiHfileOutputFormat.java | 25 +++++++++++-----------
1 file changed, 13 insertions(+), 12 deletions(-)
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
index 3b2d4c47bf..b792958b7a 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
@@ -68,10 +68,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
@@ -114,7 +115,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
@Override
public RecordWriter<TableRowkeyPair, Cell> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
- return createRecordWriter(context);
+ return createRecordWriter(context, this.getOutputCommitter(context));
}
/**
@@ -123,11 +124,11 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
* @return
* @throws IOException
*/
- static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter(final TaskAttemptContext context)
+ static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter(
+ final TaskAttemptContext context, final OutputCommitter committer)
throws IOException {
// Get the path of the temporary output file
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ final Path outputdir = ((PathOutputCommitter) committer).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
@@ -336,7 +337,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf,final String tableName) {
Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],Algorithm>(Bytes.BYTES_COMPARATOR);
Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
- if(tableConfigs == null) {
+ if (tableConfigs == null) {
return compressionMap;
}
Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,COMPRESSION_FAMILIES_CONF_KEY);
@@ -355,7 +356,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
*/
private static Map<String, String> getTableConfigurations(Configuration conf, final String tableName) {
String tableDefn = conf.get(tableName);
- if(StringUtils.isEmpty(tableDefn)) {
+ if (StringUtils.isEmpty(tableDefn)) {
return null;
}
TargetTableRef table = TargetTableRefFunctions.FROM_JSON.apply(tableDefn);
@@ -374,7 +375,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf,final String tableName) {
Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],BloomType>(Bytes.BYTES_COMPARATOR);
Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
- if(tableConfigs == null) {
+ if (tableConfigs == null) {
return bloomTypeMap;
}
Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,BLOOM_TYPE_FAMILIES_CONF_KEY);
@@ -396,7 +397,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf,final String tableName) {
Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],Integer>(Bytes.BYTES_COMPARATOR);
Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
- if(tableConfigs == null) {
+ if (tableConfigs == null) {
return blockSizeMap;
}
Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,BLOCK_SIZE_FAMILIES_CONF_KEY);
@@ -420,7 +421,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
- if(tableConfigs == null) {
+ if (tableConfigs == null) {
return encoderMap;
}
Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
@@ -441,7 +442,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
private static Map<byte[], String> createFamilyConfValueMap(Map<String,String> configs, String confName) {
Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
String confVal = configs.get(confName);
- if(StringUtils.isEmpty(confVal)) {
+ if (StringUtils.isEmpty(confVal)) {
return confValMap;
}
for (String familyConf : confVal.split("&")) {
@@ -677,7 +678,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
// tableStartKeys for all tables.
Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
- for(TargetTableRef table : tablesToBeLoaded) {
+ for (TargetTableRef table : tablesToBeLoaded) {
final String tableName = table.getPhysicalName();
try(Connection hbaseConn = ConnectionFactory.createConnection(conf);){
Set<TableRowkeyPair> startKeys =