You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2020/12/18 10:19:31 UTC

[carbondata] branch master updated: [CARBONDATA-4071] Fix wrong values of date or timestamp child columns on reading through SDK

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new eeee018  [CARBONDATA-4071] Fix wrong values of date or timestamp child columns on reading through SDK
eeee018 is described below

commit eeee018145154d0b08310b5e08fb6ae2a884832f
Author: Karan980 <ka...@gmail.com>
AuthorDate: Mon Dec 7 19:32:48 2020 +0530

    [CARBONDATA-4071] Fix wrong values of date or timestamp child columns on reading through SDK
    
    Why is this PR needed?
    When date or timestamp column is present inside complex columns (for eg : Array(Date)),
    it gives wrong result on reading through SDK.
    
    What changes were proposed in this PR?
    Fix the conversion of INT into date and LONG into timestamp column
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4046
---
 .../apache/carbondata/sdk/file/CarbonReader.java   |  85 ++++++-
 .../carbondata/sdk/file/CarbonReaderTest.java      | 267 +++++++++++++++++++++
 2 files changed, 339 insertions(+), 13 deletions(-)

diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index c3ebdc3..7799de4 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -126,6 +127,75 @@ public class CarbonReader<T> {
     return formatDateAndTimeStamp((Object []) row);
   }
 
+  /**
+   * This method converts the date and timestamp columns into right format. Before conversion date
+   * is present as integer and timestamp is present as long. This method also flattens complex
+   * columns and format the date/timestamp child present in them.
+   */
+  public Object getFormattedData(CarbonDimension dimension, Object row, SimpleDateFormat dateFormat,
+      SimpleDateFormat timeStampFormat) {
+    ColumnSchema columnSchema = dimension.getColumnSchema();
+    if (row != null && columnSchema != null) {
+      DataType dataType = columnSchema.getDataType();
+      if (dataType == DataTypes.DATE) {
+        return dateFormat
+            .format(new Date(DateDirectDictionaryGenerator.MILLIS_PER_DAY * (int) row));
+      } else if (dataType == DataTypes.TIMESTAMP) {
+        return timeStampFormat.format(new Date((long) row / 1000));
+      } else if (dataType.isComplexType()) {
+        List<CarbonDimension> listOfChildDimensions = dimension.getListOfChildDimensions();
+        Object[] childDimensionFormattedValues = new Object[((Object[]) row).length];
+        if (listOfChildDimensions != null && listOfChildDimensions.size() > 0) {
+          int i = 0;
+          if (DataTypes.isArrayType(dataType)) {
+            DataType childDataType = listOfChildDimensions.get(0).getColumnSchema().getDataType();
+            if (childDataType == DataTypes.DATE || childDataType == DataTypes.TIMESTAMP
+                || childDataType.isComplexType()) {
+              for (Object val : (Object[]) row) {
+                childDimensionFormattedValues[i] =
+                   getFormattedData(listOfChildDimensions.get(0), val, dateFormat, timeStampFormat);
+                i++;
+              }
+            } else {
+              return row;
+            }
+          } else if (DataTypes.isStructType(dataType)) {
+            for (Object val : (Object[]) row) {
+              childDimensionFormattedValues[i] =
+                  getFormattedData(listOfChildDimensions.get(i), val, dateFormat, timeStampFormat);
+              i++;
+            }
+          } else if (DataTypes.isMapType(dataType)) {
+            CarbonDimension childDimension = listOfChildDimensions.get(0);
+            ColumnSchema childSchema = childDimension.getColumnSchema();
+            DataType childDataType = childSchema.getDataType();
+            if (DataTypes.isStructType(childDataType)) {
+              /* Map is returned as array of keys and values. So convert map childrens
+               * (key and value) into array dimension before processing
+               */
+              List<CarbonDimension> mapChilds = childDimension.getListOfChildDimensions();
+              ColumnSchema arraySchema = childSchema.clone();
+              arraySchema.setDataType(
+                  DataTypes.createArrayType(mapChilds.get(i).getColumnSchema().getDataType()));
+              CarbonDimension arrayDimension = new CarbonDimension(arraySchema,
+                  childDimension.getOrdinal(), childDimension.getKeyOrdinal(),
+                  childDimension.getSchemaOrdinal());
+              for (Object val : (Object[]) row) {
+                arrayDimension.initializeChildDimensionsList(1);
+                arrayDimension.getListOfChildDimensions().add(mapChilds.get(i));
+                childDimensionFormattedValues[i] =
+                    getFormattedData(arrayDimension, val, dateFormat, timeStampFormat);
+                i++;
+              }
+            }
+          }
+          return childDimensionFormattedValues;
+        }
+      }
+    }
+    return row;
+  }
+
   public T formatDateAndTimeStamp(Object[] row) {
     List<ProjectionDimension> dimensions = ((AbstractRecordReader) currentReader)
             .getQueryModel().getProjectionDimensions();
@@ -142,19 +212,8 @@ public class CarbonReader<T> {
     }
     SimpleDateFormat timeStampFormat = new SimpleDateFormat(carbonTimeStampFormat);
     for (ProjectionDimension dimension : dimensions) {
-      ColumnSchema columnSchema = dimension.getDimension().getColumnSchema();
-      if (columnSchema == null) {
-        continue;
-      }
-      DataType dataType = columnSchema.getDataType();
-      if (dataType == DataTypes.DATE) {
-        row[dimension.getOrdinal()] = dateFormat
-                .format(new Date(DateDirectDictionaryGenerator.MILLIS_PER_DAY
-                        * (int)row[dimension.getOrdinal()]));
-      } else if (dataType == DataTypes.TIMESTAMP) {
-        row[dimension.getOrdinal()] = timeStampFormat
-                .format(new Date((long)row[dimension.getOrdinal()] / 1000));
-      }
+      row[dimension.getOrdinal()] = getFormattedData(dimension.getDimension(),
+          row[dimension.getOrdinal()], dateFormat, timeStampFormat);
     }
     return (T)row;
   }
diff --git a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 33ac9a2..1433bb5 100644
--- a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
@@ -1742,6 +1743,272 @@ public class CarbonReaderTest extends TestCase {
   }
 
   @Test
+  public void testReadDateAndTimestampColumnInMap() {
+    String path = "./testWriteFiles";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+      Field[] fields = new Field[6];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("dateField", DataTypes.DATE);
+      fields[3] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[4] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[5] =
+          new Field("mapType", DataTypes.createMapType(DataTypes.TIMESTAMP, DataTypes.DATE));
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields))
+          .writtenBy("CarbonReaderTest").build();
+
+      for (int i = 0; i < 10; i++) {
+        String[] row2 = new String[] { "robot" + (i % 10), String.valueOf(i % 10000), "2019-03-02",
+            "2019-02-12 03:03:34", "varchar", "2019-03-30 17:22:31\u00022019-03-30"
+            + "\u00012019-03-30 17:22:32\u00022019-03-10\u00012019-03-30 17:22:33\u00022019-03-14"
+            + "\u00012019-03-30 17:22:36\u00022019-03-18" };
+        writer.write(row2);
+      }
+      writer.close();
+      File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name == null) {
+            return false;
+          }
+          return name.endsWith("carbondata");
+        }
+      });
+      if (dataFiles == null || dataFiles.length < 1) {
+        throw new RuntimeException("Carbon data file not exists.");
+      }
+      Schema schema = CarbonSchemaReader.readSchema(dataFiles[0].getAbsolutePath()).asOriginOrder();
+      // Transform the schema
+      String[] strings = new String[schema.getFields().length];
+      for (int i = 0; i < schema.getFields().length; i++) {
+        strings[i] = (schema.getFields())[i].getFieldName();
+      }
+      // Read data
+      CarbonReader reader = CarbonReader.builder(path).projection(strings).build();
+      Object[] mapKeValue = new Object[2];
+      mapKeValue[0] = new Object[] { "2019-03-30 17:22:36", "2019-03-30 17:22:33",
+          "2019-03-30 17:22:32", "2019-03-30 17:22:31" };
+      mapKeValue[1] = new Object[] { "2019-03-18", "2019-03-14", "2019-03-10", "2019-03-30" };
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        assert (row[0].equals("robot" + i));
+        assert (row[2].equals("2019-03-02"));
+        assert (row[3].equals("2019-02-12 03:03:34"));
+        Assert.assertArrayEquals(mapKeValue, (Object[]) row[5]);
+        i++;
+      }
+      Assert.assertEquals(i, 10);
+      reader.close();
+      FileUtils.deleteDirectory(new File(path));
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testReadDateAndTimestampColumnInArray() {
+    String path = "./testWriteFiles";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+      Field[] fields = new Field[7];
+      fields[0] = new Field("stringField", DataTypes.STRING);
+      fields[1] = new Field("shortField", DataTypes.SHORT);
+      fields[2] = new Field("dateField", DataTypes.DATE);
+      fields[3] = new Field("timeField", DataTypes.TIMESTAMP);
+      fields[4] = new Field("varcharField", DataTypes.VARCHAR);
+      fields[5] = new Field("arrayFieldDate", DataTypes.createArrayType(DataTypes.DATE));
+      fields[6] = new Field("arrayFieldTimestamp", DataTypes.createArrayType(DataTypes.TIMESTAMP));
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withLoadOptions(map)
+          .withCsvInput(new Schema(fields)).writtenBy("CarbonReaderTest").build();
+
+      for (int i = 0; i < 10; i++) {
+        String[] row2 = new String[] { "robot" + (i % 10), String.valueOf(i % 10000), "2019-03-02",
+            "2019-02-12 03:03:34", "varchar", "2019-03-02#2019-03-03#2019-03-04#2019-03-05",
+            "2019-02-12 03:03:34#2019-02-12 03:03:38#2019-02-12 03:03:41#2019-02-12 03:12:34" };
+        writer.write(row2);
+      }
+      writer.close();
+      File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name == null) {
+            return false;
+          }
+          return name.endsWith("carbondata");
+        }
+      });
+      if (dataFiles == null || dataFiles.length < 1) {
+        throw new RuntimeException("Carbon data file not exists.");
+      }
+      Schema schema = CarbonSchemaReader.readSchema(dataFiles[0].getAbsolutePath()).asOriginOrder();
+      // Transform the schema
+      String[] strings = new String[schema.getFields().length];
+      for (int i = 0; i < schema.getFields().length; i++) {
+        strings[i] = (schema.getFields())[i].getFieldName();
+      }
+      // Read data
+      CarbonReader reader = CarbonReader.builder(path).projection(strings).build();
+      Object[] arrDate = new Object[] { "2019-03-02", "2019-03-03", "2019-03-04", "2019-03-05" };
+      Object[] arrTimestamp = new Object[] { "2019-02-12 03:03:34", "2019-02-12 03:03:38",
+          "2019-02-12 03:03:41", "2019-02-12 03:12:34" };
+
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        assert (row[0].equals("robot" + i));
+        assert (row[2].equals("2019-03-02"));
+        assert (row[3].equals("2019-02-12 03:03:34"));
+        Assert.assertArrayEquals(arrDate, (Object[]) row[5]);
+        Assert.assertArrayEquals(arrTimestamp, (Object[]) row[6]);
+        i++;
+      }
+      Assert.assertEquals(i, 10);
+      reader.close();
+      FileUtils.deleteDirectory(new File(path));
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testReadDateAndTimestampColumnInStruct() {
+    String path = "./testWriteFiles";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+      Field[] fields = new Field[3];
+      fields[0] = new Field("name", DataTypes.STRING);
+      fields[1] = new Field("age", DataTypes.INT);
+      ArrayList<StructField> structFields = new ArrayList<>();
+      structFields.add(new StructField("dateField", DataTypes.DATE));
+      structFields.add(new StructField("timestampField", DataTypes.TIMESTAMP));
+      fields[2] = new Field("structField", DataTypes.createStructType(structFields));
+      Map<String, String> map = new HashMap<>();
+      map.put("complex_delimiter_level_1", "#");
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withLoadOptions(map)
+          .withCsvInput(new Schema(fields)).writtenBy("CarbonReaderTest").build();
+
+      for (int i = 0; i < 10; i++) {
+        String[] row2 = new String[] { "robot" + (i % 10), String.valueOf(i % 10000),
+            "2019-03-02#2019-02-12 03:12:34" };
+        writer.write(row2);
+      }
+      writer.close();
+      File[] dataFiles = new File(path).listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          if (name == null) {
+            return false;
+          }
+          return name.endsWith("carbondata");
+        }
+      });
+      if (dataFiles == null || dataFiles.length < 1) {
+        throw new RuntimeException("Carbon data file not exists.");
+      }
+      Schema schema = CarbonSchemaReader.readSchema(dataFiles[0].getAbsolutePath()).asOriginOrder();
+      // Transform the schema
+      String[] strings = new String[schema.getFields().length];
+      for (int i = 0; i < schema.getFields().length; i++) {
+        strings[i] = (schema.getFields())[i].getFieldName();
+      }
+      // Read data
+      CarbonReader reader = CarbonReader.builder(path).projection(strings).build();
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        assert (row[0].equals("robot" + i));
+        Object[] arr = (Object[]) row[2];
+        assert (arr[0].equals("2019-03-02"));
+        assert (arr[1].equals("2019-02-12 03:12:34"));
+        i++;
+      }
+      Assert.assertEquals(i, 10);
+      reader.close();
+      FileUtils.deleteDirectory(new File(path));
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testReadingDateAndTimestampColumnInArrayOfStruct() throws IOException {
+    String path = "./testWriteFilesArrayStruct";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[4];
+    fields[0] = new Field("id", DataTypes.STRING);
+    fields[1] = new Field("source", DataTypes.STRING);
+    fields[2] = new Field("usage", DataTypes.STRING);
+    List<StructField> structFieldsList = new ArrayList<>();
+    structFieldsList.add(new StructField("name", DataTypes.STRING));
+    structFieldsList.add(new StructField("type", DataTypes.STRING));
+    structFieldsList.add(new StructField("creation-date", DataTypes.DATE));
+    structFieldsList.add(new StructField("creation-timestamp", DataTypes.TIMESTAMP));
+    StructField structTypeByList =
+        new StructField("annotation", DataTypes.createStructType(structFieldsList),
+            structFieldsList);
+    List<StructField> list = new ArrayList<>();
+    list.add(structTypeByList);
+    Field arrayType = new Field("annotations", "array", list);
+    fields[3] = arrayType;
+    try {
+      CarbonWriter writer = CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields))
+          .writtenBy("complexTest").build();
+      for (int i = 0; i < 15; i++) {
+        String[] row = new String[] { "robot" + i, String.valueOf(i), i + "." + i,
+            "sunflowers" + (i % 10) + "\002" + "modelarts/image_classification" + "\002"
+                + "2019-03-30" + "\002" + "2019-03-30 17:22:31" + "\001" + "roses" + (i % 10)
+                + "\002" + "modelarts/image_classification" + "\002" + "2019-03-30" + "\002"
+                + "2019-03-30 17:22:31" };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    Schema schema = CarbonSchemaReader.readSchema(path).asOriginOrder();
+    assert (4 == schema.getFieldsLength());
+    CarbonReader reader = null;
+    try {
+      reader = CarbonReader.builder(path)
+          .projection(new String[] { "id", "source", "usage", "annotations" }).build();
+      int i = 0;
+      while (reader.hasNext()) {
+        Object[] row = (Object[]) reader.readNextRow();
+        assert (4 == row.length);
+        assert (row[0].equals("robot" + i));
+        int value = Integer.valueOf((String) row[1]);
+        Float value2 = Float.valueOf((String) row[2]);
+        assert (value > -1 || value < 15);
+        assert (value2 > -1 || value2 < 15);
+        Object[] annotations = (Object[]) row[3];
+        for (int j = 0; j < annotations.length; j++) {
+          Object[] annotation = (Object[]) annotations[j];
+          assert (((String) annotation[0]).contains("sunflowers") || ((String) annotation[0])
+              .contains("roses"));
+          assert (((String) annotation[1]).contains("modelarts/image_classification"));
+          assert (annotation[2].equals("2019-03-30"));
+          assert (annotation[3].equals("2019-03-30 17:22:31"));
+        }
+        i++;
+      }
+      assert (15 == i);
+      reader.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      FileUtils.deleteDirectory(new File(path));
+    }
+  }
+
+  @Test
   public void testReadNextRowWithRowUtil() {
     String path = "./carbondata";
     try {