You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/01 04:04:30 UTC

[GitHub] [incubator-doris-flink-connector] gj-zhang opened a new pull request, #33: [FIX] fix flink date and timestamp type not mapping.

gj-zhang opened a new pull request, #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33

   # Proposed changes
   
   make date and timestamp type work.
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r886640776


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java:
##########
@@ -17,16 +17,15 @@
 package org.apache.doris.flink.deserialization.converter;
 
 import org.apache.doris.flink.serialization.RowBatch;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.*;

Review Comment:
   not import *



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] gj-zhang commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
gj-zhang commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r893281949


##########
flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java:
##########
@@ -17,23 +17,33 @@
 package org.apache.doris.flink.deserialization.convert;
 

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] gj-zhang commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
gj-zhang commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r892043741


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java:
##########
@@ -33,36 +34,32 @@
 import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
 import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
 import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
  * Serializer for RowData.
  */
 public class RowDataSerializer implements DorisRecordSerializer<RowData> {
     String[] fieldNames;
-    RowData.FieldGetter[] fieldGetters;
     String type;
     private ObjectMapper objectMapper;
     private final String fieldDelimiter;
     private final boolean enableDelete;
+    private final DorisRowConverter rowConverter;
 
-    private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) {
+    private RowDataSerializer(String[] fieldNames, RowType rowType, String type, String fieldDelimiter, boolean enableDelete) {

Review Comment:
   yes. it be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r886640397


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java:
##########
@@ -114,6 +118,93 @@ public String parseDeleteSign(RowKind rowKind) {
         }
     }
 
+    private RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
+        final RowData.FieldGetter fieldGetter;
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                fieldGetter = row -> row.getString(fieldPos);
+                break;
+            case BOOLEAN:
+                fieldGetter = row -> row.getBoolean(fieldPos);
+                break;
+            case BINARY:
+            case VARBINARY:
+                fieldGetter = row -> row.getBinary(fieldPos);
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                fieldGetter = row -> row.getDecimal(fieldPos, decimalPrecision, decimalScale);
+                break;
+            case TINYINT:
+                fieldGetter = row -> row.getByte(fieldPos);
+                break;
+            case SMALLINT:
+                fieldGetter = row -> row.getShort(fieldPos);
+                break;
+            case INTEGER:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+                fieldGetter = row -> row.getInt(fieldPos);
+                break;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                fieldGetter = row -> row.getLong(fieldPos);
+                break;
+            case FLOAT:
+                fieldGetter = row -> row.getFloat(fieldPos);
+                break;
+            case DOUBLE:
+                fieldGetter = row -> row.getDouble(fieldPos);
+                break;
+            case DATE:
+                fieldGetter = row -> Date.valueOf(LocalDate.ofEpochDay(row.getInt(fieldPos)));
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                fieldGetter = row -> row.getTimestamp(fieldPos, timestampPrecision);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException();
+            case ARRAY:
+                fieldGetter = row -> row.getArray(fieldPos);
+                break;
+            case MULTISET:
+            case MAP:
+                fieldGetter = row -> row.getMap(fieldPos);
+                break;
+            case ROW:
+            case STRUCTURED_TYPE:
+                final int rowFieldCount = getFieldCount(fieldType);
+                fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
+                break;
+            case DISTINCT_TYPE:
+                fieldGetter =
+                        createFieldGetter(((DistinctType) fieldType).getSourceType(), fieldPos);
+                break;
+            case RAW:
+                fieldGetter = row -> row.getRawValue(fieldPos);
+                break;
+            case NULL:
+            case SYMBOL:
+            case UNRESOLVED:
+            default:
+                throw new IllegalArgumentException();
+        }
+        if (!fieldType.isNullable()) {
+            return fieldGetter;
+        }
+        return row -> {
+            if (row.isNullAt(fieldPos)) {
+                return null;
+            }
+            return fieldGetter.getFieldOrNull(row);
+        };
+    }
+
     public static Builder builder() {

Review Comment:
   The type conversion here is recommended to be abstracted and placed in `DorisRowConverter`, you can refer to `DeserializationConverter` or `org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r891980901


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java:
##########
@@ -33,36 +34,32 @@
 import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
 import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
 import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
  * Serializer for RowData.
  */
 public class RowDataSerializer implements DorisRecordSerializer<RowData> {
     String[] fieldNames;
-    RowData.FieldGetter[] fieldGetters;
     String type;
     private ObjectMapper objectMapper;
     private final String fieldDelimiter;
     private final boolean enableDelete;
+    private final DorisRowConverter rowConverter;
 
-    private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) {
+    private RowDataSerializer(String[] fieldNames, RowType rowType, String type, String fieldDelimiter, boolean enableDelete) {

Review Comment:
   I think it might not be a good idea to modify the constructor because of the risk of incompatibility with the current version.
   In the DorisRowConverter class, would it be better to overload the constructor for the parameters of type DataType?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] gj-zhang commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
gj-zhang commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r890801739


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java:
##########
@@ -243,8 +249,34 @@ public void convertArrowToRowBatch() throws DorisException {
                         }
                         break;
                     case "DATE":
-                    case "LARGEINT":
+                        Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector date = (VarCharVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+                            if (date.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = new String(date.get(rowIndex));
+                            LocalDate localDate = LocalDate.parse(value, dateFormatter);
+                            addValueToRow(rowIndex, localDate);
+                        }
+                        break;
                     case "DATETIME":
+                        Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector timeStampSecVector = (VarCharVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+                            if (timeStampSecVector.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = new String(timeStampSecVector.get(rowIndex));
+                            LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter);
+                            addValueToRow(rowIndex, parse);
+                        }
+                        break;
+                    case "LARGEINT":

Review Comment:
   i didn't think so. because this operation is convert arrow to rowbatch. 
   `org.apache.doris.flink.deserialization.converter.DorisRowConverter` convert rowbatch to flink row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] gj-zhang commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
gj-zhang commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r892043410


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java:
##########
@@ -125,4 +181,59 @@ protected DeserializationConverter createConverter(LogicalType type) {
                 throw new UnsupportedOperationException("Unsupported type:" + type);
         }
     }
+
+    protected SerializationConverter createExternalConverter(LogicalType type) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r893243115


##########
flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java:
##########
@@ -17,23 +17,33 @@
 package org.apache.doris.flink.deserialization.convert;
 

Review Comment:
   add license



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba merged pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba merged PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r886628968


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java:
##########
@@ -243,8 +249,34 @@ public void convertArrowToRowBatch() throws DorisException {
                         }
                         break;
                     case "DATE":
-                    case "LARGEINT":
+                        Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector date = (VarCharVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+                            if (date.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = new String(date.get(rowIndex));
+                            LocalDate localDate = LocalDate.parse(value, dateFormatter);
+                            addValueToRow(rowIndex, localDate);
+                        }
+                        break;
                     case "DATETIME":
+                        Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector timeStampSecVector = (VarCharVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
+                            if (timeStampSecVector.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = new String(timeStampSecVector.get(rowIndex));
+                            LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter);
+                            addValueToRow(rowIndex, parse);
+                        }
+                        break;
+                    case "LARGEINT":

Review Comment:
   Would flink-related type conversions be handled better here? `org.apache.doris.flink.deserialization.converter.DorisRowConverter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] gj-zhang commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
gj-zhang commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r890977273


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java:
##########
@@ -17,16 +17,15 @@
 package org.apache.doris.flink.deserialization.converter;
 
 import org.apache.doris.flink.serialization.RowBatch;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.*;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris-flink-connector] JNSimba commented on a diff in pull request #33: [FIX] fix flink date and timestamp type not mapping.

Posted by GitBox <gi...@apache.org>.
JNSimba commented on code in PR #33:
URL: https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r891985882


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java:
##########
@@ -125,4 +181,59 @@ protected DeserializationConverter createConverter(LogicalType type) {
                 throw new UnsupportedOperationException("Unsupported type:" + type);
         }
     }
+
+    protected SerializationConverter createExternalConverter(LogicalType type) {

Review Comment:
   Unit tests for this method can be added to DorisRowConverterTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org