You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/14 06:01:24 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (b8ff779 -> 35db691)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from b8ff779  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
     new 95967f1  add git status
     new 35db691  add git status

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/engine/filenode/FileNodeProcessor.java      | 68 +++++++++---------
 .../db/engine/filenode/FileNodeProcessorTest.java  | 80 ++++++++++++++++++++++
 2 files changed, 114 insertions(+), 34 deletions(-)
 create mode 100644 iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java


[incubator-iotdb] 02/02: add git status

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 35db6918b48f10557df809c5f1b3407f2478d17b
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Jun 14 14:01:12 2019 +0800

    add git status
---
 .../db/engine/filenode/FileNodeProcessor.java      | 68 +++++++++++-----------
 1 file changed, 34 insertions(+), 34 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index c9d1077..40a72c4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -2121,38 +2121,38 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     }
   }
 
-//  /**
-//   * wait for all closing processors finishing their tasks
-//   */
-//  public void waitforAllClosed() throws FileNodeProcessorException {
-//    close();
-//    while (getClosingBufferWriteProcessor().size() != 0) {
-//      checkAllClosingProcessors();
-//      try {
-//        Thread.sleep(10);
-//      } catch (InterruptedException e) {
-//        LOGGER.error("Filenode Processor {} is interrupted when waiting for all closed.", processorName, e);
-//      }
-//    }
-//  }
-
-
-//  void checkAllClosingProcessors() {
-//    Iterator<BufferWriteProcessor> iterator =
-//        this.getClosingBufferWriteProcessor().iterator();
-//    while (iterator.hasNext()) {
-//      BufferWriteProcessor processor = iterator.next();
-//      try {
-//        if (processor.getCloseFuture().get(10, TimeUnit.MILLISECONDS)) {
-//          //if finished, we can remove it.
-//          iterator.remove();
-//        }
-//      } catch (InterruptedException | ExecutionException e) {
-//        LOGGER.error("Close bufferwrite processor {} failed.", processor.getProcessorName(), e);
-//      } catch (TimeoutException e) {
-//        //do nothing.
-//      }
-//    }
-//    this.getClosingBufferWriteProcessor().reset();
-//  }
+  /**
+   * wait for all closing processors finishing their tasks
+   */
+  public void waitforAllClosed() throws FileNodeProcessorException {
+    close();
+    while (getClosingBufferWriteProcessor().size() != 0) {
+      checkAllClosingProcessors();
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        LOGGER.error("Filenode Processor {} is interrupted when waiting for all closed.", processorName, e);
+      }
+    }
+  }
+
+
+  void checkAllClosingProcessors() {
+    Iterator<BufferWriteProcessor> iterator =
+        this.getClosingBufferWriteProcessor().iterator();
+    while (iterator.hasNext()) {
+      BufferWriteProcessor processor = iterator.next();
+      try {
+        if (processor.getCloseFuture().get(10, TimeUnit.MILLISECONDS)) {
+          //if finished, we can remove it.
+          iterator.remove();
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error("Close bufferwrite processor {} failed.", processor.getProcessorName(), e);
+      } catch (TimeoutException e) {
+        //do nothing.
+      }
+    }
+    this.getClosingBufferWriteProcessor().reset();
+  }
 }
\ No newline at end of file


[incubator-iotdb] 01/02: add git status

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 95967f1ca55ae3f36eb5ccb7b0dd371b83722d41
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Jun 14 14:00:55 2019 +0800

    add git status
---
 .../db/engine/filenode/FileNodeProcessorTest.java  | 80 ++++++++++++++++++++++
 1 file changed, 80 insertions(+)

diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java
new file mode 100644
index 0000000..5f862e1
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.filenode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileNodeProcessorTest {
+
+  FileNodeProcessor processor;
+  private String deviceId = "root.vehicle.d0";
+  private String measurementId = "s0";
+  private TSDataType dataType = TSDataType.INT32;
+  private String processName = "root.vehicle";
+  @Before
+  public void setUp() throws FileNodeProcessorException {
+    // init metadata
+    MetadataManagerHelper.initMetadata();
+    processor = new FileNodeProcessor(IoTDBDescriptor.getInstance().getConfig().getFileNodeDir(), processName);
+
+  }
+
+  @After
+  public void tearDown() throws IOException, FileNodeManagerException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testAsyncClose()
+      throws FileNodeProcessorException, BufferWriteProcessorException, ExecutionException, InterruptedException {
+
+    BufferWriteProcessor bwProcessor;
+    int i =1;
+    for (int j = 1; j < 5; j++) {
+      bwProcessor = processor.getBufferWriteProcessor(processName, System.currentTimeMillis());
+      for (; i <= 100 * j; i++) {
+        bwProcessor.write(deviceId, measurementId, i, TSDataType.INT32, String.valueOf(i));
+      }
+      processor.closeBufferWrite();
+    }
+    Assert.assertNotEquals(0, processor.getClosingBufferWriteProcessor().size());
+    processor.waitforAllClosed();
+    Assert.assertEquals(0, processor.getClosingBufferWriteProcessor().size());
+
+  }
+}