You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2023/12/15 05:17:26 UTC

(phoenix) branch 5.1 updated: PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 6fd287a08e PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location
6fd287a08e is described below

commit 6fd287a08e55a2bd06c15a543c4c211b5e815986
Author: Sergey Soldatov <ss...@.apache.org>
AuthorDate: Tue May 31 13:37:20 2022 -0700

    PHOENIX-6721 CSV bulkload tool fails with FileNotFoundException if --output points to the S3 location
    
    Co-authored-by: Istvan Toth <st...@apache.org>
---
 .../phoenix/mapreduce/MultiHfileOutputFormat.java  | 25 +++++++++++-----------
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
index 3a9071e123..a027f00400 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
@@ -67,10 +67,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.phoenix.compat.hbase.CompatUtil;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
@@ -113,7 +114,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
     @Override
     public RecordWriter<TableRowkeyPair, Cell> getRecordWriter(TaskAttemptContext context)
             throws IOException, InterruptedException {
-        return createRecordWriter(context);
+        return createRecordWriter(context, this.getOutputCommitter(context));
     }
 
     /**
@@ -122,11 +123,11 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
      * @return
      * @throws IOException 
      */
-    static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter(final TaskAttemptContext context)
+    static <V extends Cell> RecordWriter<TableRowkeyPair, V> createRecordWriter(
+        final TaskAttemptContext context, final OutputCommitter committer)
             throws IOException {
         // Get the path of the temporary output file
-        final Path outputPath = FileOutputFormat.getOutputPath(context);
-        final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
+        final Path outputdir = ((PathOutputCommitter) committer).getWorkPath();
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = outputdir.getFileSystem(conf);
      
@@ -336,7 +337,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
     static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf,final String tableName) {
         Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],Algorithm>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return compressionMap;
         }
         Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,COMPRESSION_FAMILIES_CONF_KEY);
@@ -355,7 +356,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
      */
     private static Map<String, String> getTableConfigurations(Configuration conf, final String tableName) {
         String tableDefn = conf.get(tableName);
-        if(StringUtils.isEmpty(tableDefn)) {
+        if (StringUtils.isEmpty(tableDefn)) {
             return null;
         }
         TargetTableRef table = TargetTableRefFunctions.FROM_JSON.apply(tableDefn);
@@ -374,7 +375,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
     static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf,final String tableName) {
         Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],BloomType>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return bloomTypeMap;
         }
         Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,BLOOM_TYPE_FAMILIES_CONF_KEY);
@@ -396,7 +397,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
     static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf,final String tableName) {
         Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],Integer>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return blockSizeMap;
         }
         Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,BLOCK_SIZE_FAMILIES_CONF_KEY);
@@ -420,7 +421,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
         
         Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
         Map<String, String> tableConfigs = getTableConfigurations(conf, tableName);
-        if(tableConfigs == null) {
+        if (tableConfigs == null) {
             return encoderMap;
         }
         Map<byte[], String> stringMap = createFamilyConfValueMap(tableConfigs,DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
@@ -441,7 +442,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
     private static Map<byte[], String> createFamilyConfValueMap(Map<String,String> configs, String confName) {
         Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
         String confVal = configs.get(confName);
-        if(StringUtils.isEmpty(confVal)) {
+        if (StringUtils.isEmpty(confVal)) {
             return confValMap;
         }
         for (String familyConf : confVal.split("&")) {
@@ -677,7 +678,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce
 
         // tableStartKeys for all tables.
         Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
-        for(TargetTableRef table : tablesToBeLoaded) {
+        for (TargetTableRef table : tablesToBeLoaded) {
            final String tableName = table.getPhysicalName();
            try(Connection hbaseConn = ConnectionFactory.createConnection(conf);){
                 Set<TableRowkeyPair> startKeys =