You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/27 13:49:28 UTC

[iotdb] branch IOTDB-4780 created (now 6b61cda09e)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-4780
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 6b61cda09e temp

This branch includes the following new commits:

     new 6b61cda09e temp

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: temp

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4780
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6b61cda09e8ddb6fcf1b9896335be4365cab6cfb
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Thu Oct 27 21:48:13 2022 +0800

    temp
---
 .../java/org/apache/iotdb/RewriteTsFileTool.java   | 94 +++++++++++++++++++++-
 1 file changed, 91 insertions(+), 3 deletions(-)

diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index f403f60bfd..d3851bccdb 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -97,6 +98,10 @@ public class RewriteTsFileTool {
   private static String filePath = "";
   private static String readMode = "s";
   private static boolean ignoreBrokenChunk = false;
+  private static boolean writeAsAligned = true;
+
+  private static Map<String, MeasurementSchema> schemaMap = new HashMap<>();
+  private static Map<Long, Map<String, TsPrimitiveType>> timeValuePairMap = new HashMap<>();
 
   public static void main(String[] args) {
     Session session = null;
@@ -693,6 +698,60 @@ public class RewriteTsFileTool {
             while (seriesIterator.hasNextSeries()) {
               writeSingleSeries(device, seriesIterator, session);
             }
+            if (writeAsAligned) {
+              List<MeasurementSchema> schemas = new ArrayList<>(schemaMap.values());
+              Tablet tablet = new Tablet(device, schemas, MAX_TABLET_LENGTH);
+              for (Map.Entry<Long, Map<String, TsPrimitiveType>> entry :
+                  timeValuePairMap.entrySet()) {
+                tablet.addTimestamp(tablet.rowSize, entry.getKey());
+                Map<String, TsPrimitiveType> valueMap = entry.getValue();
+                for (MeasurementSchema schema : schemas) {
+                  TsPrimitiveType tsPrimitiveType =
+                      valueMap.getOrDefault(schema.getMeasurementId(), null);
+                  if (tsPrimitiveType != null) {
+                    switch (tsPrimitiveType.getDataType()) {
+                      case BOOLEAN:
+                        tablet.addValue(
+                            schema.getMeasurementId(),
+                            tablet.rowSize,
+                            tsPrimitiveType.getBoolean());
+                        break;
+                      case INT32:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getInt());
+                        break;
+                      case INT64:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getLong());
+                        break;
+                      case FLOAT:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getFloat());
+                        break;
+                      case DOUBLE:
+                        tablet.addValue(
+                            schema.getMeasurementId(), tablet.rowSize, tsPrimitiveType.getDouble());
+                        break;
+                      case TEXT:
+                        tablet.addValue(
+                            schema.getMeasurementId(),
+                            tablet.rowSize,
+                            tsPrimitiveType.getStringValue());
+                        break;
+                    }
+                  }
+                }
+                tablet.rowSize++;
+                if (tablet.rowSize > MAX_TABLET_LENGTH) {
+                  session.insertAlignedTablet(tablet);
+                  tablet.reset();
+                }
+              }
+              if (tablet.rowSize > 0) {
+                session.insertAlignedTablet(tablet);
+                tablet.reset();
+              }
+            }
           }
         }
       }
@@ -704,9 +763,10 @@ public class RewriteTsFileTool {
   protected static void writeSingleSeries(
       String device, MultiTsFileDeviceIterator.MeasurementIterator seriesIterator, Session session)
       throws IllegalPathException {
-    PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+    //    PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
     LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
         seriesIterator.getMetadataListForCurrentSeries();
+    String series = seriesIterator.nextSeries();
     while (!readerAndChunkMetadataList.isEmpty()) {
       Pair<TsFileSequenceReader, List<ChunkMetadata>> readerMetadataPair =
           readerAndChunkMetadataList.removeFirst();
@@ -714,16 +774,44 @@ public class RewriteTsFileTool {
       List<ChunkMetadata> chunkMetadataList = readerMetadataPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         try {
-          writeSingleChunk(device, p, chunkMetadata, reader, session);
+          //          if (!writeAsAligned) {
+          //            writeSingleChunk(device, p, chunkMetadata, reader, session);
+          //          } else {
+          cacheForAligned(device, series, chunkMetadata, reader);
+          //          }
         } catch (Throwable t) {
           // this is a broken chunk, skip it
           t.printStackTrace();
-          System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement());
+          System.out.printf("Skip broken chunk in device %s.%s%n", device, series);
         }
       }
     }
   }
 
+  protected static void cacheForAligned(
+      String device, String measurement, ChunkMetadata chunkMetadata, TsFileSequenceReader reader)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    Chunk chunk = reader.readMemChunk(chunkMetadata);
+    ChunkHeader chunkHeader = chunk.getHeader();
+    MeasurementSchema schema =
+        new MeasurementSchema(
+            measurement,
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            CompressionType.GZIP);
+    schemaMap.computeIfAbsent(measurement, x -> schema);
+    IChunkReader chunkReader = new ChunkReader(chunk, null);
+    while (chunkReader.hasNextSatisfiedPage()) {
+      IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
+      while (batchIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
+        timeValuePairMap
+            .computeIfAbsent(timeValuePair.getTimestamp(), x -> new HashMap<>())
+            .put(measurement, timeValuePair.getValue());
+      }
+    }
+  }
+
   /** Read and write a single chunk for not aligned series. */
   protected static void writeSingleChunk(
       String device,