You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/03/29 19:35:28 UTC

[incubator-heron] branch master updated: fix saving and restoring checkpoint from dlog (#3223)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c64d53  fix saving and restoring checkpoint from dlog (#3223)
5c64d53 is described below

commit 5c64d53a6270c9f0154fd9db34897f434b90bf81
Author: SiMing Weng <si...@gmail.com>
AuthorDate: Fri Mar 29 15:35:23 2019 -0400

    fix saving and restoring checkpoint from dlog (#3223)
    
    Flush the output stream before closing the underlying dlog writer
    return -1 when no byte is read from the dlog reader
---
 .../java/org/apache/heron/dlog/DLInputStream.java  | 98 ++++++++++++----------
 .../java/org/apache/heron/dlog/DLOutputStream.java | 12 ++-
 .../org/apache/heron/dlog/DLInputStreamTest.java   |  2 +-
 .../heron/statefulstorage/dlog/DlogStorage.java    | 11 +++
 heron/statefulstorages/tests/java/BUILD            |  1 +
 .../statefulstorage/dlog/DlogStorageTest.java      |  4 +-
 6 files changed, 78 insertions(+), 50 deletions(-)

diff --git a/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java b/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java
index 0e6581b..75f1f1b 100644
--- a/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java
+++ b/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java
@@ -21,6 +21,7 @@ package org.apache.heron.dlog;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.logging.Logger;
 
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecordWithDLSN;
@@ -29,36 +30,12 @@ import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.exceptions.EndOfStreamException;
 
 public class DLInputStream extends InputStream {
-
-  private LogRecordWithInputStream currentLogRecord = null;
+  private static final Logger LOG = Logger.getLogger(DLInputStream.class.getName());
   private final DistributedLogManager dlm;
+  private LogRecordWithInputStream currentLogRecord = null;
   private LogReader reader;
   private boolean eos = false;
-
-  // Cache the input stream for a log record.
-  private static class LogRecordWithInputStream {
-    private final InputStream payloadStream;
-    private final LogRecordWithDLSN logRecord;
-
-    LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
-      this.logRecord = logRecord;
-      this.payloadStream = logRecord.getPayLoadInputStream();
-    }
-
-    InputStream getPayLoadInputStream() {
-      return payloadStream;
-    }
-
-    LogRecordWithDLSN getLogRecord() {
-      return logRecord;
-    }
-
-    // The last txid of the log record is the position of the next byte in the stream.
-    // Subtract length to get starting offset.
-    long getOffset() {
-      return logRecord.getTransactionId() - logRecord.getPayload().length;
-    }
-  }
+  private long numOfBytesRead = 0;
 
   /**
    * Construct distributedlog input stream
@@ -70,21 +47,6 @@ public class DLInputStream extends InputStream {
     reader = dlm.getInputStream(DLSN.InitialDLSN);
   }
 
-  /**
-   * Get input stream representing next entry in the
-   * ledger.
-   *
-   * @return input stream, or null if no more entries
-   */
-  private LogRecordWithInputStream nextLogRecord() throws IOException {
-    try {
-      return nextLogRecord(reader);
-    } catch (EndOfStreamException e) {
-      eos = true;
-      return null;
-    }
-  }
-
   private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException {
     LogRecordWithDLSN record = reader.readNext(false);
 
@@ -100,6 +62,26 @@ public class DLInputStream extends InputStream {
     }
   }
 
+  public long getNumOfBytesRead() {
+    return numOfBytesRead;
+  }
+
+  /**
+   * Get input stream representing next entry in the
+   * ledger.
+   *
+   * @return input stream, or null if no more entries
+   */
+  private LogRecordWithInputStream nextLogRecord() throws IOException {
+    try {
+      return nextLogRecord(reader);
+    } catch (EndOfStreamException e) {
+      eos = true;
+      LOG.info(()->"end of stream is reached");
+      return null;
+    }
+  }
+
   @Override
   public int read() throws IOException {
     byte[] b = new byte[1];
@@ -120,7 +102,7 @@ public class DLInputStream extends InputStream {
     if (currentLogRecord == null) {
       currentLogRecord = nextLogRecord();
       if (currentLogRecord == null) {
-        return read;
+        return -1;
       }
     }
 
@@ -129,13 +111,14 @@ public class DLInputStream extends InputStream {
       if (thisread == -1) {
         currentLogRecord = nextLogRecord();
         if (currentLogRecord == null) {
-          return read;
+          return read == 0 ? -1 : read;
         }
       } else {
+        numOfBytesRead += thisread;
         read += thisread;
       }
     }
-    return read;
+    return read == 0 ? -1 : read;
   }
 
   @Override
@@ -143,4 +126,29 @@ public class DLInputStream extends InputStream {
     reader.close();
     dlm.close();
   }
+
+  // Cache the input stream for a log record.
+  private static class LogRecordWithInputStream {
+    private final InputStream payloadStream;
+    private final LogRecordWithDLSN logRecord;
+
+    LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
+      this.logRecord = logRecord;
+      this.payloadStream = logRecord.getPayLoadInputStream();
+    }
+
+    InputStream getPayLoadInputStream() {
+      return payloadStream;
+    }
+
+    LogRecordWithDLSN getLogRecord() {
+      return logRecord;
+    }
+
+    // The last txid of the log record is the position of the next byte in the stream.
+    // Subtract length to get starting offset.
+    long getOffset() {
+      return logRecord.getTransactionId() - logRecord.getPayload().length;
+    }
+  }
 }
diff --git a/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java b/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java
index 41b9cb2..87d3103 100644
--- a/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java
+++ b/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.heron.dlog;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.logging.Logger;
 
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -29,9 +30,10 @@ import org.apache.distributedlog.api.DistributedLogManager;
  * DistributedLog Output Stream.
  */
 public class DLOutputStream extends OutputStream {
-
+  private static final Logger LOG = Logger.getLogger(DLOutputStream.class.getName());
   private final DistributedLogManager dlm;
   private final AppendOnlyStreamWriter writer;
+  private long numOfBytesWritten = 0;
 
   public DLOutputStream(DistributedLogManager dlm,
                         AppendOnlyStreamWriter writer) {
@@ -41,7 +43,7 @@ public class DLOutputStream extends OutputStream {
 
   @Override
   public void write(int b) throws IOException {
-    byte[] data = new byte[] {
+    byte[] data = new byte[]{
         (byte) b
     };
     write(data);
@@ -57,6 +59,8 @@ public class DLOutputStream extends OutputStream {
 
   @Override
   public void write(byte[] b) throws IOException {
+    LOG.info(()-> "writing " + b.length + " bytes to output stream");
+    numOfBytesWritten = numOfBytesWritten + b.length;
     writer.write(b);
   }
 
@@ -71,4 +75,8 @@ public class DLOutputStream extends OutputStream {
     writer.close();
     dlm.close();
   }
+
+  public long getNumOfBytesWritten() {
+    return numOfBytesWritten;
+  }
 }
diff --git a/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java b/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java
index 7925e1e..1f3f675 100644
--- a/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java
+++ b/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java
@@ -54,7 +54,7 @@ public class DLInputStreamTest {
     byte[] b = new byte[1];
     DLInputStream in = new DLInputStream(dlm);
     assertEquals("Should return 0 when reading an empty eos stream",
-        0, in.read(b, 0, 1));
+        -1, in.read(b, 0, 1));
     assertEquals("Should return -1 when reading an empty eos stream",
         -1, in.read(b, 0, 1));
   }
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
index e53e45f..b67539d 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
@@ -145,10 +145,17 @@ public class DlogStorage implements IStatefulStorage {
     OutputStream out = null;
     try {
       out = openOutputStream(checkpointPath);
+      LOG.info(() -> String.format("writing a check point of %d bytes",
+          checkpoint.getCheckpoint().getSerializedSize()));
       checkpoint.getCheckpoint().writeTo(out);
+      out.flush();
     } catch (IOException e) {
       throw new StatefulStorageException("Failed to persist checkpoint @ " + checkpointPath, e);
     } finally {
+      if (out != null) {
+        final long num = ((DLOutputStream) out).getNumOfBytesWritten();
+        LOG.info(() -> num + "bytes written");
+      }
       SysUtils.closeIgnoringExceptions(out);
     }
   }
@@ -170,6 +177,10 @@ public class DlogStorage implements IStatefulStorage {
     } catch (IOException ioe) {
       throw new StatefulStorageException("Failed to read checkpoint from " + checkpointPath, ioe);
     } finally {
+      if (in != null) {
+        final long num = ((DLInputStream) in).getNumOfBytesRead();
+        LOG.info(() -> num + " bytes read");
+      }
       SysUtils.closeIgnoringExceptions(in);
     }
 
diff --git a/heron/statefulstorages/tests/java/BUILD b/heron/statefulstorages/tests/java/BUILD
index c5671bc..5da7047 100644
--- a/heron/statefulstorages/tests/java/BUILD
+++ b/heron/statefulstorages/tests/java/BUILD
@@ -48,6 +48,7 @@ java_tests(
 
 dlog_deps_files = [
   "//heron/statefulstorages/src/java:dlog-statefulstorage-java",
+  "//heron/io/dlog/src/java:dlog-lib",
   "@com_google_guava_guava//jar",
   "@org_apache_distributedlog_core//jar",
   "@io_netty_netty_all//jar",
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
index 6e55c3c..7780165 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.heron.statefulstorage.dlog;
 
-import java.io.InputStream;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.heron.dlog.DLInputStream;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.system.PhysicalPlans;
 import org.apache.heron.spi.statefulstorage.Checkpoint;
@@ -118,7 +118,7 @@ public class DlogStorageTest {
 
   @Test
   public void testRestore() throws Exception {
-    InputStream mockInputStream = mock(InputStream.class);
+    DLInputStream mockInputStream = mock(DLInputStream.class);
     doReturn(mockInputStream).when(dlogStorage).openInputStream(anyString());
 
     PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);