You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/05/10 00:46:41 UTC

[incubator-iotdb] branch refactor_overflow updated (3f81ff0 -> 8dbf279)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 3f81ff0  Merge remote-tracking branch 'origin/refactor_bufferwrite_new' into refactor_overflow
     new d98144c  temporary commit
     new 8dbf279  tmp commit

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 iotdb/pom.xml                                      | 20 +++++++++
 .../org/apache/cassandra/utils/ObjectSizes.java    |  2 +-
 .../db/engine/overflowdata/OverflowProcessor.java  |  7 +--
 .../db/engine/sgmanager/StorageGroupProcessor.java | 51 ++++------------------
 .../db/engine/tsfiledata/TsFileProcessor.java      | 25 ++++++-----
 .../engine/overflowdata/OverflowProcessorTest.java |  2 +
 .../db/engine/tsfiledata/TsFileProcessorTest.java  |  2 +-
 7 files changed, 51 insertions(+), 58 deletions(-)
 rename iotdb/src/main/{java => cassandra}/org/apache/cassandra/utils/ObjectSizes.java (99%)


[incubator-iotdb] 01/02: temporary commit

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d98144c499f0edbe2800c2a6b8ed9ebe69cefd62
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed May 8 15:14:14 2019 +0800

    temporary commit
---
 iotdb/pom.xml                                        | 20 ++++++++++++++++++++
 .../org/apache/cassandra/utils/ObjectSizes.java      |  2 +-
 .../db/engine/overflowdata/OverflowProcessor.java    |  5 +++++
 .../iotdb/db/engine/tsfiledata/TsFileProcessor.java  | 15 +++++----------
 .../engine/overflowdata/OverflowProcessorTest.java   |  2 ++
 .../db/engine/tsfiledata/TsFileProcessorTest.java    |  2 +-
 6 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/iotdb/pom.xml b/iotdb/pom.xml
index 5552876..c531846 100644
--- a/iotdb/pom.xml
+++ b/iotdb/pom.xml
@@ -79,6 +79,26 @@
     </dependencies>
     <build>
         <plugins>
+            <!-- add cassandra's source file -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.4</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${basedir}/src/main/cassandra</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
             <plugin>
                 <artifactId>maven-clean-plugin</artifactId>
                 <configuration>
diff --git a/iotdb/src/main/java/org/apache/cassandra/utils/ObjectSizes.java b/iotdb/src/main/cassandra/org/apache/cassandra/utils/ObjectSizes.java
similarity index 99%
rename from iotdb/src/main/java/org/apache/cassandra/utils/ObjectSizes.java
rename to iotdb/src/main/cassandra/org/apache/cassandra/utils/ObjectSizes.java
index 1ca3f97..2bddd9e 100644
--- a/iotdb/src/main/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/iotdb/src/main/cassandra/org/apache/cassandra/utils/ObjectSizes.java
@@ -164,4 +164,4 @@ public class ObjectSizes
   {
     return meter.measure(pojo);
   }
-}
+}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
index 6c31247..14b68c0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
@@ -75,6 +76,10 @@ public class OverflowProcessor extends TsFileProcessor {
     return true;
   }
 
+  @Override
+  protected String getLogSuffix() {
+    return IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX;
+  }
   public OperationResult update(UpdatePlan plan) {
     return null;
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 3a5bc1f..22c5d73 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -221,7 +221,7 @@ public class TsFileProcessor extends Processor {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       try {
         logNode = MultiFileLogNodeManager.getInstance().getNode(
-            processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX,
+            processorName + getLogSuffix(),
             writer.getRestoreFilePath(),
             FileNodeManager.getInstance().getRestoreFilePath(processorName));
       } catch (IOException e) {
@@ -794,17 +794,12 @@ public class TsFileProcessor extends Processor {
     return Directories.getInstance().getNextFolderForTsfile();
   }
 
+  protected String getLogSuffix() {
+    return IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX;
+  }
+
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof TsFileProcessor)) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
     return this == o;
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java
index c568fc3..8eb681a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessorTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.sgmanager.OperationResult;
 import org.apache.iotdb.db.engine.tsfiledata.TsFileProcessorTest;
 import org.apache.iotdb.db.engine.version.SysTimeVersionController;
@@ -48,6 +49,7 @@ import org.junit.Test;
 public class OverflowProcessorTest extends TsFileProcessorTest {
   @Before
   public void setUp() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
     super.setUp();
     processor.close();
     processor = new OverflowProcessor("root.test", doNothingAction, doNothingAction, doNothingAction,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
index 6399f55..ac90e75 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessorTest.java
@@ -80,7 +80,7 @@ public class TsFileProcessorTest {
   public void setUp() throws Exception {
     EnvironmentUtils.envSetUp();
 //  now we do not support wal because it need to modify the wal module.
-//  IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
+    IoTDBDescriptor.getInstance().getConfig().setEnableWal(true);
     IoTDBDescriptor.getInstance().getConfig().setBufferwriteFileSizeThreshold(2*1024*1024);
     mManager = MManager.getInstance();
     queryManager = new EngineQueryRouter();


[incubator-iotdb] 02/02: tmp commit

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_overflow
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 8dbf279777d038cc7f773730b4a646ca6c196752
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri May 10 08:46:30 2019 +0800

    tmp commit
---
 .../db/engine/overflowdata/OverflowProcessor.java  |  4 --
 .../db/engine/sgmanager/StorageGroupProcessor.java | 51 ++++------------------
 .../db/engine/tsfiledata/TsFileProcessor.java      | 10 +++++
 3 files changed, 18 insertions(+), 47 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
index 14b68c0..22f28dd 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflowdata/OverflowProcessor.java
@@ -80,9 +80,5 @@ public class OverflowProcessor extends TsFileProcessor {
   protected String getLogSuffix() {
     return IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX;
   }
-  public OperationResult update(UpdatePlan plan) {
-    return null;
-  }
-
 
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
index 70d2ec0..ed2fdca 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/sgmanager/StorageGroupProcessor.java
@@ -72,12 +72,9 @@ import org.slf4j.LoggerFactory;
 public class StorageGroupProcessor extends Processor implements IStatistic {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StorageGroupProcessor.class);
-  private static final String RESTORE_FILE_SUFFIX = ".restore";
 
   TsFileProcessor tsFileProcessor;
   OverflowProcessor overflowProcessor;
-  private FileNodeProcessorStore fileNodeProcessorStore;
-  String fileNodeProcessorStoreFilePath;
 
   //the version controller is shared by tsfile and overflow processor.
   private VersionController versionController;
@@ -97,31 +94,19 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
 
     this.fileSchema = constructFileSchema(processorName);
 
-    File restoreFolder = new File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
+    File systemFolder = new File(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(),
         processorName);
-    if (!restoreFolder.exists()) {
-      restoreFolder.mkdirs();
-      LOGGER.info("The restore directory of the filenode processor {} doesn't exist. Create new " +
-              "directory {}",
-          getProcessorName(), restoreFolder.getAbsolutePath());
+    if (!systemFolder.exists()) {
+      systemFolder.mkdirs();
+      LOGGER.info("The directory of the filenode processor {} doesn't exist. Create new directory {}",
+          getProcessorName(), systemFolder.getAbsolutePath());
     }
-    versionController = new SimpleFileVersionController(restoreFolder.getAbsolutePath());
+    versionController = new SimpleFileVersionController(systemFolder.getAbsolutePath());
     tsFileProcessor = new TsFileProcessor(processorName, beforeFlushAction, afterFlushAction,
         afterCloseAction, versionController, fileSchema);
     overflowProcessor = new OverflowProcessor(processorName, beforeFlushAction, afterFlushAction,
         afterCloseAction, versionController, fileSchema);
 
-    fileNodeProcessorStoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
-        .getPath();
-    try {
-      readStoreFromDisk();
-    } catch (FileNodeProcessorException | IOException e) {
-      LOGGER.error(
-          "The fileNode processor {} encountered an error when recoverying restore " +
-              "information.", processorName);
-      throw new FileNodeProcessorException(e);
-    }
-
     // RegistStatService
     if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
       String statStorageDeltaName =
@@ -143,11 +128,13 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
   }
 
   public void update(UpdatePlan plan) {
+    tsFileProcessor.update(plan);
     overflowProcessor.update(plan);
   }
 
   public void delete(String device, String measurementId, long timestamp) throws IOException {
     tsFileProcessor.delete(device, measurementId, timestamp);
+    overflowProcessor.delete(device, measurementId, timestamp);
   }
 
   /**
@@ -169,9 +156,6 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
 
   }
 
-
-
-
   @Override
   public boolean canBeClosed() {
     tsFileProcessor.canBeClosed();
@@ -226,23 +210,4 @@ public class StorageGroupProcessor extends Processor implements IStatistic {
     return schema;
 
   }
-
-
-  private void readStoreFromDisk()
-      throws FileNodeProcessorException, IOException {
-    //only be used when recorvery, and at this time, there is no write operations.
-    File restoreFile = new File(fileNodeProcessorStoreFilePath);
-    if (!restoreFile.exists() || restoreFile.length() == 0) {
-      fileNodeProcessorStore = new FileNodeProcessorStore(false, new HashMap<>(),
-          new TsFileResource(null, false),
-          new ArrayList<>(), FileNodeProcessorStatus.NONE, 0);
-    }
-    try (FileInputStream inputStream = new FileInputStream(fileNodeProcessorStoreFilePath)) {
-      fileNodeProcessorStore = FileNodeProcessorStore.deSerialize(inputStream);
-    } catch (IOException e) {
-      LOGGER.error("Failed to deserialize the FileNodeRestoreFile {}, {}",
-          fileNodeProcessorStoreFilePath, e);
-      throw new FileNodeProcessorException(e);
-    }
-  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
index 22c5d73..85af489 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/tsfiledata/TsFileProcessor.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
@@ -75,6 +76,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -339,6 +341,14 @@ public class TsFileProcessor extends Processor {
     valueCount++;
   }
 
+  public OperationResult update(UpdatePlan plan) {
+    String device = plan.getPath().getDevice();
+    String measurement = plan.getPath().getMeasurement();
+    List<Pair<Long, Long>> intervals = plan.getIntervals();
+    //TODO modify workMemtable, flushMemtable, and existing TsFiles
+    return OperationResult.WRITE_REJECT_BY_MEM;
+  }
+
   /**
    * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
    * Delete data in both working MemTable and flushing MemTable.