You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/12 07:53:24 UTC
[incubator-inlong] branch master updated: [INLONG-2403][Sort] Fix the problem that Parquet format doen't work (#2473)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ef67756 [INLONG-2403][Sort] Fix the problem that Parquet format doen't work (#2473)
ef67756 is described below
commit ef677566fc1c8373235e13ac4e141064c9955e74
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Sat Feb 12 15:53:16 2022 +0800
[INLONG-2403][Sort] Fix the problem that Parquet format doen't work (#2473)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
.../hive/formats/parquet/ParquetRowWriter.java | 18 ++-
.../formats/parquet/ParquetSchemaConverter.java | 3 +-
.../formats/parquet/ParquetBulkWriterTest.java | 135 +++++++++++++++++++++
3 files changed, 149 insertions(+), 7 deletions(-)
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
index ebd763a..6252e11 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.flink.hive.formats.parquet;
+import java.sql.Timestamp;
+import java.util.Date;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
@@ -72,28 +74,32 @@ public class ParquetRowWriter {
switch (t.getTypeRoot()) {
case CHAR:
case VARCHAR:
- case DECIMAL:
return new StringWriter();
case BOOLEAN:
return new BooleanWriter();
case BINARY:
case VARBINARY:
return new BinaryWriter();
+ case DECIMAL:
+ return (row, ordinal) -> recordConsumer.addBinary(Binary.fromReusedByteArray(
+ row.getField(ordinal).toString().getBytes()));
case TINYINT:
return new ByteWriter();
case SMALLINT:
return new ShortWriter();
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
case INTEGER:
return new IntWriter();
- case TIMESTAMP_WITHOUT_TIME_ZONE:
case BIGINT:
return new LongWriter();
case FLOAT:
return new FloatWriter();
case DOUBLE:
return new DoubleWriter();
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return (row, ordinal) -> recordConsumer.addInteger((int) ((Date) row.getField(ordinal)).getTime());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (row, ordinal) -> recordConsumer.addLong(((Timestamp) row.getField(ordinal)).getTime());
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -119,7 +125,7 @@ public class ParquetRowWriter {
@Override
public void write(Row row, int ordinal) {
- recordConsumer.addInteger((Integer) row.getField(ordinal));
+ recordConsumer.addInteger((Byte) row.getField(ordinal));
}
}
@@ -127,7 +133,7 @@ public class ParquetRowWriter {
@Override
public void write(Row row, int ordinal) {
- recordConsumer.addInteger((Integer) row.getField(ordinal));
+ recordConsumer.addInteger((Short) row.getField(ordinal));
}
}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
index 35e08b6..bd3c4de 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
@@ -598,7 +598,8 @@ public class ParquetSchemaConverter {
.as(OriginalType.TIME_MILLIS)
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
+ return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
+ .as(OriginalType.TIMESTAMP_MILLIS)
.named(name);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
new file mode 100644
index 0000000..7b0a681
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.hive.formats.parquet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.formats.parquet.ParquetBulkWriter;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.fs.Path;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.flink.hive.HiveSinkHelper;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFieldPartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.ParquetFileFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetReader.Builder;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class ParquetBulkWriterTest {
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testAllSupportedTypes() throws IOException {
+ File testFile = temporaryFolder.newFile("test.parquet");
+ ParquetBulkWriter<Row> parquetBulkWriter = (ParquetBulkWriter<Row>) HiveSinkHelper
+ .createBulkWriterFactory(prepareHiveSinkInfo(), new Configuration())
+ .create(new LocalDataOutputStream(testFile));
+ parquetBulkWriter.addElement(Row.of(
+ "string",
+ false,
+ (byte) 1,
+ (short) 11,
+ 111,
+ 1111L,
+ (float) 1.1,
+ 1.11,
+ new BigDecimal("123456789123456789"),
+ new Date(0),
+ new Time(0),
+ new Timestamp(0)
+ ));
+ parquetBulkWriter.finish();
+
+ GroupReadSupport readSupport = new GroupReadSupport();
+ Builder<Group> builder = ParquetReader.builder(readSupport, new Path(testFile.toURI()));
+ ParquetReader<Group> reader = builder.build();
+ Group line = reader.read();
+
+ assertNotNull(line);
+ assertEquals("string", line.getString(0, 0));
+ assertFalse(line.getBoolean(1, 0));
+ assertEquals(1, line.getInteger(2, 0));
+ assertEquals(11, line.getInteger(3, 0));
+ assertEquals(111, line.getInteger(4, 0));
+ assertEquals(1111, line.getLong(5, 0));
+ assertEquals(1.1, line.getFloat(6, 0), 0.01);
+ assertEquals(1.11, line.getDouble(7, 0), 0.001);
+ assertEquals("123456789123456789", line.getString(8, 0));
+ assertEquals(0, line.getInteger(9, 0));
+ assertEquals(0, line.getInteger(10, 0));
+ assertEquals(0, line.getLong(11, 0));
+ }
+
+ private HiveSinkInfo prepareHiveSinkInfo() {
+ return new HiveSinkInfo(
+ new FieldInfo[] {
+ new FieldInfo("f1", StringFormatInfo.INSTANCE),
+ new FieldInfo("f2", BooleanFormatInfo.INSTANCE),
+ new FieldInfo("f3", ByteFormatInfo.INSTANCE),
+ new FieldInfo("f4", ShortFormatInfo.INSTANCE),
+ new FieldInfo("f5", IntFormatInfo.INSTANCE),
+ new FieldInfo("f6", LongFormatInfo.INSTANCE),
+ new FieldInfo("f7", FloatFormatInfo.INSTANCE),
+ new FieldInfo("f8", DoubleFormatInfo.INSTANCE),
+ new FieldInfo("f9", DecimalFormatInfo.INSTANCE),
+ new FieldInfo("f10", new DateFormatInfo()),
+ new FieldInfo("f11", new TimeFormatInfo()),
+ new FieldInfo("f12", new TimestampFormatInfo())
+ },
+ "jdbc:mysql://127.0.0.1:3306/testDatabaseName",
+ "testDatabaseName",
+ "testTableName",
+ "testUsername",
+ "testPassword",
+ "/path",
+ new HivePartitionInfo[]{
+ new HiveFieldPartitionInfo("f13"),
+ },
+ new ParquetFileFormat()
+ );
+ }
+}