You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/02/21 08:29:02 UTC

[GitHub] [incubator-inlong] ywh19911024 commented on a change in pull request #2615: [INLONG-2614][Sort] Support array and map data structures in Hive sink and ClickHouse sink

ywh19911024 commented on a change in pull request #2615:
URL: https://github.com/apache/incubator-inlong/pull/2615#discussion_r810859692



##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseRowConverter.java
##########
@@ -83,8 +91,64 @@ private static void setField(
             statement.setDate(index + 1, (Date) value);
         } else if (typeInfo instanceof TimestampTypeInfo) {
             statement.setTimestamp(index + 1, (Timestamp) value);
+        } else if (typeInfo instanceof ArrayTypeInfo) {
+            TypeInfo elementTypeInfo = ((ArrayTypeInfo) typeInfo).getElementTypeInfo();
+            statement.setArray(index + 1, new ClickHouseArray(
+                    getClickHouseDataTypeFromTypeInfo(elementTypeInfo), toObjectArray(elementTypeInfo, value)));
+        } else if (typeInfo instanceof MapTypeInfo) {
+            Map<?, ?> mapValue = (Map<?, ?>) value;
+            int size = mapValue.size();
+            Object[] kvps = new Object[size * 2];
+            int i = 0;
+            for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
+                kvps[i] = entry.getKey();
+                kvps[i + 1] = entry.getValue();
+                i += 2;
+            }
+            statement.setObject(index + 1, Utils.mapOf(kvps));

Review comment:
       MapTypeInfo类型的转换可以抽出一个方法,写个单测

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriter.java
##########
@@ -23,46 +23,261 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 import java.util.zip.GZIPOutputStream;
+
 import org.anarres.lzo.LzoAlgorithm;
 import org.anarres.lzo.LzoCompressor;
 import org.anarres.lzo.LzoLibrary;
 import org.anarres.lzo.LzopOutputStream;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
 
 public class TextRowWriter implements BulkWriter<Row> {
 
+    private static final String NULL_STRING = "null";
+
+    private static final String ARRAY_SPLITTER = ",";
+    private static final String ARRAY_START_SYMBOL = "[";
+    private static final String ARRAY_END_SYMBOL = "]";
+
+    private static final String MAP_START_SYMBOL = "{";
+    private static final String MAP_END_SYMBOL = "}";
+    private static final String MAP_ENTRY_SPLITTER = ",";
+    private static final String MAP_KEY_VALUE_SPLITTER = "=";
+
     private final OutputStream outputStream;
 
     private final TextFileFormat textFileFormat;
 
     private final int bufferSize;
 
+    private final LogicalType[] fieldTypes;
+
     public TextRowWriter(
             FSDataOutputStream fsDataOutputStream,
             TextFileFormat textFileFormat,
-            Configuration config) throws IOException {
+            Configuration config,
+            LogicalType[] fieldTypes) throws IOException {
         this.bufferSize = checkNotNull(config).getInteger(Constants.SINK_HIVE_TEXT_BUFFER_SIZE);
         this.outputStream = getCompressionOutputStream(checkNotNull(fsDataOutputStream), textFileFormat);
         this.textFileFormat = checkNotNull(textFileFormat);
+        this.fieldTypes = checkNotNull(fieldTypes);
     }
 
     @Override
     public void addElement(Row row) throws IOException {
         for (int i = 0; i < row.getArity(); i++) {
-            outputStream.write(String.valueOf(row.getField(i)).getBytes(StandardCharsets.UTF_8));
+            String fieldStr = convertField(row.getField(i), fieldTypes[i]);
+            outputStream.write(fieldStr.getBytes(StandardCharsets.UTF_8));
             if (i != row.getArity() - 1) {
                 outputStream.write(textFileFormat.getSplitter());
             }
         }
         outputStream.write(10); // start a new line
     }
 
+    @VisibleForTesting
+    static String convertField(Object field, LogicalType fieldType) {
+        if (field == null) {
+            return NULL_STRING;
+        }
+
+        switch (fieldType.getTypeRoot()) {
+            case ARRAY:
+                return convertArray(field, ((ArrayType) fieldType).getElementType());
+            case MAP:
+                return convertMap((Map<?, ?>) field);
+            default:
+                return String.valueOf(field);
+        }
+    }
+
+    private static String convertArray(Object input, LogicalType elementType) {
+        switch (elementType.getTypeRoot()) {
+            case BOOLEAN:
+                return convertBooleanArray(input);
+            case TINYINT:
+                return convertByteArray(input);
+            case SMALLINT:
+                return convertShortArray(input);
+            case INTEGER:
+                return convertIntArray(input);
+            case BIGINT:
+                return convertLongArray(input);
+            case FLOAT:
+                return convertFloatArray(input);
+            case DOUBLE:
+                return convertDoubleArray(input);
+            default:
+                return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertObjectArray(Object[] objArray) {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(ARRAY_START_SYMBOL);
+        for (int i = 0; i < objArray.length; i++) {
+            stringBuilder.append(objArray[i]);
+            if (i != objArray.length - 1) {
+                stringBuilder.append(ARRAY_SPLITTER);
+            }
+        }
+        stringBuilder.append(ARRAY_END_SYMBOL);
+        return stringBuilder.toString();
+    }
+
+    private static String convertBooleanArray(Object input) {
+        if (input instanceof boolean[]) {
+            boolean[] inputArray = (boolean[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertByteArray(Object input) {
+        if (input instanceof byte[]) {
+            byte[] inputArray = (byte[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertShortArray(Object input) {
+        if (input instanceof short[]) {
+            short[] inputArray = (short[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertIntArray(Object input) {
+        if (input instanceof int[]) {
+            int[] inputArray = (int[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertLongArray(Object input) {
+        if (input instanceof long[]) {
+            long[] inputArray = (long[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertFloatArray(Object input) {
+        if (input instanceof float[]) {
+            float[] inputArray = (float[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }
+
+    private static String convertDoubleArray(Object input) {
+        if (input instanceof double[]) {
+            double[] inputArray = (double[]) input;
+            StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder.append(ARRAY_START_SYMBOL);
+            for (int i = 0; i < inputArray.length; i++) {
+                stringBuilder.append(inputArray[i]);
+                if (i != inputArray.length - 1) {
+                    stringBuilder.append(ARRAY_SPLITTER);
+                }
+            }
+            stringBuilder.append(ARRAY_END_SYMBOL);
+            return stringBuilder.toString();
+        } else {
+            return convertObjectArray((Object[]) input);
+        }
+    }

Review comment:
       这几个方式程序结构是类似的,是不是可以考虑用泛型来代替,用到某一个具体的类的时候,利用这个类来构造就可以了
   
   

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
##########
@@ -104,7 +114,15 @@ private FieldWriter createWriter(LogicalType t, Type type) {
                     throw new UnsupportedOperationException("Unsupported type: " + type);
             }
         } else {
-            throw new IllegalArgumentException("Unsupported  data type: " + t);
+            switch (t.getTypeRoot()) {
+                case ARRAY:
+                    return new ArrayWriter(((ArrayType) t).getElementType(),
+                            type.asGroupType().getType(0).asGroupType().getType(0));

Review comment:
       type.asGroupType().getType(0).asGroupType().getType(0)这里是说ARRAY这个类型里边还有可能是groupType这种类型?

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
##########
@@ -184,4 +202,281 @@ public void write(Row row, int ordinal) {
             recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) row.getField(ordinal)));
         }
     }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private final LogicalType elementTypeFlink;
+
+        private final Type elementTypeParquet;
+
+        public ArrayWriter(LogicalType elementTypeFlink, Type elementTypeParquet) {
+            this.elementTypeFlink = elementTypeFlink;
+            this.elementTypeParquet = elementTypeParquet;
+        }
+
+        @Override
+        public void write(Row row, int ordinal) {
+            if (elementTypeParquet.isPrimitive()) {
+                switch (elementTypeFlink.getTypeRoot()) {
+                    case CHAR:
+                    case VARCHAR:
+                    case DECIMAL:
+                    case DATE:
+                    case TIME_WITHOUT_TIME_ZONE:
+                    case TIMESTAMP_WITHOUT_TIME_ZONE:
+                        writeObjectArray(row.getField(ordinal));
+                        break;
+                    case BOOLEAN:
+                        writeBooleanArray(row.getField(ordinal));
+                        break;
+                    case TINYINT:
+                        writeTinyIntArray(row.getField(ordinal));
+                        break;
+                    case SMALLINT:
+                        writeShortArray(row.getField(ordinal));
+                        break;
+                    case INTEGER:
+                        writeIntArray(row.getField(ordinal));
+                        break;
+                    case BIGINT:
+                        writeLongArray(row.getField(ordinal));
+                        break;
+                    case FLOAT:
+                        writeFloatArray(row.getField(ordinal));
+                        break;
+                    case DOUBLE:
+                        writeDoubleArray(row.getField(ordinal));
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unsupported element type in array: " + elementTypeParquet);
+                }
+            } else {
+                throw new UnsupportedOperationException("Unsupported element type in array: " + elementTypeParquet);
+            }
+        }
+
+        private void writeObjectArray(Object input) {
+            recordConsumer.startGroup();
+            if (input != null) {
+                Object[] inputArray = (Object[]) input;
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (Object ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+            }
+            recordConsumer.endGroup();
+        }
+
+        private void writeBooleanArray(Object input) {
+            if (input instanceof boolean[]) {
+                boolean[] inputArray = (boolean[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (boolean ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeTinyIntArray(Object input) {
+            if (input instanceof byte[]) {
+                byte[] inputArray = (byte[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (byte ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeShortArray(Object input) {
+            if (input instanceof short[]) {
+                short[] inputArray = (short[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (short ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeIntArray(Object input) {
+            if (input instanceof int[]) {
+                int[] inputArray = (int[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (int ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeLongArray(Object input) {
+            if (input instanceof long[]) {
+                long[] inputArray = (long[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (long ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeFloatArray(Object input) {
+            if (input instanceof float[]) {
+                float[] inputArray = (float[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (float ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeDoubleArray(Object input) {
+            if (input instanceof double[]) {
+                double[] inputArray = (double[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (double ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private final LogicalType keyTypeFlink;
+
+        private final LogicalType valueTypeFlink;
+
+        private final Type keyTypeParquet;
+
+        private final Type valueTypeParquet;
+
+        public MapWriter(MapType mapTypeFlink, Type mapTypeParquet) {
+            this.keyTypeFlink = mapTypeFlink.getKeyType();
+            this.valueTypeFlink = mapTypeFlink.getValueType();
+            GroupType groupType = mapTypeParquet.asGroupType().getType(0).asGroupType();
+            this.keyTypeParquet = groupType.getType(0);
+            this.valueTypeParquet = groupType.getType(1);
+        }
+
+        @Override
+        public void write(Row row, int ordinal) {
+            recordConsumer.startGroup();
+            Object inputField = row.getField(ordinal);
+            if (inputField == null) {
+                return;
+            }
+
+            Map<?, ?> inputMap = (Map<?, ?>) row.getField(ordinal);

Review comment:
       这里可以直接用前面的inputField来转换

##########
File path: inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
##########
@@ -184,4 +202,281 @@ public void write(Row row, int ordinal) {
             recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) row.getField(ordinal)));
         }
     }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private final LogicalType elementTypeFlink;
+
+        private final Type elementTypeParquet;
+
+        public ArrayWriter(LogicalType elementTypeFlink, Type elementTypeParquet) {
+            this.elementTypeFlink = elementTypeFlink;
+            this.elementTypeParquet = elementTypeParquet;
+        }
+
+        @Override
+        public void write(Row row, int ordinal) {
+            if (elementTypeParquet.isPrimitive()) {
+                switch (elementTypeFlink.getTypeRoot()) {
+                    case CHAR:
+                    case VARCHAR:
+                    case DECIMAL:
+                    case DATE:
+                    case TIME_WITHOUT_TIME_ZONE:
+                    case TIMESTAMP_WITHOUT_TIME_ZONE:
+                        writeObjectArray(row.getField(ordinal));
+                        break;
+                    case BOOLEAN:
+                        writeBooleanArray(row.getField(ordinal));
+                        break;
+                    case TINYINT:
+                        writeTinyIntArray(row.getField(ordinal));
+                        break;
+                    case SMALLINT:
+                        writeShortArray(row.getField(ordinal));
+                        break;
+                    case INTEGER:
+                        writeIntArray(row.getField(ordinal));
+                        break;
+                    case BIGINT:
+                        writeLongArray(row.getField(ordinal));
+                        break;
+                    case FLOAT:
+                        writeFloatArray(row.getField(ordinal));
+                        break;
+                    case DOUBLE:
+                        writeDoubleArray(row.getField(ordinal));
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unsupported element type in array: " + elementTypeParquet);
+                }
+            } else {
+                throw new UnsupportedOperationException("Unsupported element type in array: " + elementTypeParquet);
+            }
+        }
+
+        private void writeObjectArray(Object input) {
+            recordConsumer.startGroup();
+            if (input != null) {
+                Object[] inputArray = (Object[]) input;
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (Object ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+            }
+            recordConsumer.endGroup();
+        }
+
+        private void writeBooleanArray(Object input) {
+            if (input instanceof boolean[]) {
+                boolean[] inputArray = (boolean[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (boolean ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeTinyIntArray(Object input) {
+            if (input instanceof byte[]) {
+                byte[] inputArray = (byte[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (byte ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeShortArray(Object input) {
+            if (input instanceof short[]) {
+                short[] inputArray = (short[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (short ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeIntArray(Object input) {
+            if (input instanceof int[]) {
+                int[] inputArray = (int[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (int ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeLongArray(Object input) {
+            if (input instanceof long[]) {
+                long[] inputArray = (long[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (long ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeFloatArray(Object input) {
+            if (input instanceof float[]) {
+                float[] inputArray = (float[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (float ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }
+
+        private void writeDoubleArray(Object input) {
+            if (input instanceof double[]) {
+                double[] inputArray = (double[]) input;
+                recordConsumer.startGroup();
+                if (inputArray.length > 0) {
+                    recordConsumer.startField(ARRAY_FIELD_NAME, 0);
+                    for (double ele : inputArray) {
+                        startGroupAndField(elementTypeParquet.getName(), 0);
+                        FieldWriter writer = createWriter(elementTypeFlink, elementTypeParquet);
+                        writer.write(Row.of(ele), 0);
+                        endGroupAndField(elementTypeParquet.getName(), 0);
+                    }
+                    recordConsumer.endField(ARRAY_FIELD_NAME, 0);
+                }
+                recordConsumer.endGroup();
+            } else {
+                writeObjectArray(input);
+            }
+        }

Review comment:
       这几个方法同样程序结构是一样的,是不是也可以用泛型来处理,不用多次书写




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org