You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/02/21 08:36:12 UTC

[GitHub] [incubator-inlong] KevinWen007 commented on a change in pull request #2615: [INLONG-2614][Sort] Support array and map data structures in Hive sink and ClickHouse sink

KevinWen007 commented on a change in pull request #2615:
URL: https://github.com/apache/incubator-inlong/pull/2615#discussion_r810884341



##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriter.java
##########
@@ -23,46 +23,261 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 import java.util.zip.GZIPOutputStream;
+
 import org.anarres.lzo.LzoAlgorithm;
 import org.anarres.lzo.LzoCompressor;
 import org.anarres.lzo.LzoLibrary;
 import org.anarres.lzo.LzopOutputStream;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
 
 public class TextRowWriter implements BulkWriter<Row> {
 
+    private static final String NULL_STRING = "null";
+
+    private static final String ARRAY_SPLITTER = ",";
+    private static final String ARRAY_START_SYMBOL = "[";
+    private static final String ARRAY_END_SYMBOL = "]";
+
+    private static final String MAP_START_SYMBOL = "{";
+    private static final String MAP_END_SYMBOL = "}";
+    private static final String MAP_ENTRY_SPLITTER = ",";
+    private static final String MAP_KEY_VALUE_SPLITTER = "=";
+
     private final OutputStream outputStream;
 
     private final TextFileFormat textFileFormat;
 
     private final int bufferSize;
 
+    private final LogicalType[] fieldTypes;
+
     public TextRowWriter(
             FSDataOutputStream fsDataOutputStream,
             TextFileFormat textFileFormat,
-            Configuration config) throws IOException {
+            Configuration config,
+            LogicalType[] fieldTypes) throws IOException {
         this.bufferSize = checkNotNull(config).getInteger(Constants.SINK_HIVE_TEXT_BUFFER_SIZE);
         this.outputStream = getCompressionOutputStream(checkNotNull(fsDataOutputStream), textFileFormat);
         this.textFileFormat = checkNotNull(textFileFormat);
+        this.fieldTypes = checkNotNull(fieldTypes);
     }
 
     @Override
     public void addElement(Row row) throws IOException {
         for (int i = 0; i < row.getArity(); i++) {
-            outputStream.write(String.valueOf(row.getField(i)).getBytes(StandardCharsets.UTF_8));
+            String fieldStr = convertField(row.getField(i), fieldTypes[i]);
+            outputStream.write(fieldStr.getBytes(StandardCharsets.UTF_8));
             if (i != row.getArity() - 1) {
                 outputStream.write(textFileFormat.getSplitter());
             }
         }
         outputStream.write(10); // start a new line
     }
 
+    @VisibleForTesting
+    static String convertField(Object field, LogicalType fieldType) {
+        if (field == null) {
+            return NULL_STRING;
+        }
+
+        switch (fieldType.getTypeRoot()) {
+            case ARRAY:
+                return convertArray(field, ((ArrayType) fieldType).getElementType());
+            case MAP:
+                return convertMap((Map<?, ?>) field);
+            default:
+                return String.valueOf(field);
+        }
+    }
+
+    private static String convertArray(Object input, LogicalType elementType) {
+        switch (elementType.getTypeRoot()) {
+            case BOOLEAN:
+                return convertBooleanArray(input);
+            case TINYINT:
+                return convertByteArray(input);
+            case SMALLINT:
+                return convertShortArray(input);
+            case INTEGER:
+                return convertIntArray(input);
+            case BIGINT:
+                return convertLongArray(input);
+            case FLOAT:
+                return convertFloatArray(input);
+            case DOUBLE:
+                return convertDoubleArray(input);
+            default:
+                return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertObjectArray(Object[] objArray) {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(ARRAY_START_SYMBOL);
+        for (int i = 0; i < objArray.length; i++) {
+            stringBuilder.append(objArray[i]);
+            if (i != objArray.length - 1) {
+                stringBuilder.append(ARRAY_SPLITTER);
+            }
+        }
+        stringBuilder.append(ARRAY_END_SYMBOL);
+        return stringBuilder.toString();
+    }
+
+    private static String convertBooleanArray(Object input) {
+        if (input instanceof boolean[]) {
+            boolean[] inputArray = (boolean[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertByteArray(Object input) {
+        if (input instanceof byte[]) {
+            byte[] inputArray = (byte[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertShortArray(Object input) {
+        if (input instanceof short[]) {
+            short[] inputArray = (short[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertIntArray(Object input) {
+        if (input instanceof int[]) {
+            int[] inputArray = (int[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertLongArray(Object input) {
+        if (input instanceof long[]) {
+            long[] inputArray = (long[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertFloatArray(Object input) {
+        if (input instanceof float[]) {
+            float[] inputArray = (float[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertDoubleArray(Object input) {
+        if (input instanceof double[]) {
+            double[] inputArray = (double[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }

Review comment:
       Generic programming is not working here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org