You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/03/29 19:35:28 UTC
[incubator-heron] branch master updated: fix saving and restoring
checkpoint from dlog (#3223)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 5c64d53 fix saving and restoring checkpoint from dlog (#3223)
5c64d53 is described below
commit 5c64d53a6270c9f0154fd9db34897f434b90bf81
Author: SiMing Weng <si...@gmail.com>
AuthorDate: Fri Mar 29 15:35:23 2019 -0400
fix saving and restoring checkpoint from dlog (#3223)
Flush the output stream before closing the underlying dlog writer
return -1 when no byte is read from the dlog reader
---
.../java/org/apache/heron/dlog/DLInputStream.java | 98 ++++++++++++----------
.../java/org/apache/heron/dlog/DLOutputStream.java | 12 ++-
.../org/apache/heron/dlog/DLInputStreamTest.java | 2 +-
.../heron/statefulstorage/dlog/DlogStorage.java | 11 +++
heron/statefulstorages/tests/java/BUILD | 1 +
.../statefulstorage/dlog/DlogStorageTest.java | 4 +-
6 files changed, 78 insertions(+), 50 deletions(-)
diff --git a/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java b/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java
index 0e6581b..75f1f1b 100644
--- a/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java
+++ b/heron/io/dlog/src/java/org/apache/heron/dlog/DLInputStream.java
@@ -21,6 +21,7 @@ package org.apache.heron.dlog;
import java.io.IOException;
import java.io.InputStream;
+import java.util.logging.Logger;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordWithDLSN;
@@ -29,36 +30,12 @@ import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.EndOfStreamException;
public class DLInputStream extends InputStream {
-
- private LogRecordWithInputStream currentLogRecord = null;
+ private static final Logger LOG = Logger.getLogger(DLInputStream.class.getName());
private final DistributedLogManager dlm;
+ private LogRecordWithInputStream currentLogRecord = null;
private LogReader reader;
private boolean eos = false;
-
- // Cache the input stream for a log record.
- private static class LogRecordWithInputStream {
- private final InputStream payloadStream;
- private final LogRecordWithDLSN logRecord;
-
- LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
- this.logRecord = logRecord;
- this.payloadStream = logRecord.getPayLoadInputStream();
- }
-
- InputStream getPayLoadInputStream() {
- return payloadStream;
- }
-
- LogRecordWithDLSN getLogRecord() {
- return logRecord;
- }
-
- // The last txid of the log record is the position of the next byte in the stream.
- // Subtract length to get starting offset.
- long getOffset() {
- return logRecord.getTransactionId() - logRecord.getPayload().length;
- }
- }
+ private long numOfBytesRead = 0;
/**
* Construct distributedlog input stream
@@ -70,21 +47,6 @@ public class DLInputStream extends InputStream {
reader = dlm.getInputStream(DLSN.InitialDLSN);
}
- /**
- * Get input stream representing next entry in the
- * ledger.
- *
- * @return input stream, or null if no more entries
- */
- private LogRecordWithInputStream nextLogRecord() throws IOException {
- try {
- return nextLogRecord(reader);
- } catch (EndOfStreamException e) {
- eos = true;
- return null;
- }
- }
-
private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException {
LogRecordWithDLSN record = reader.readNext(false);
@@ -100,6 +62,26 @@ public class DLInputStream extends InputStream {
}
}
+ public long getNumOfBytesRead() {
+ return numOfBytesRead;
+ }
+
+ /**
+ * Get input stream representing next entry in the
+ * ledger.
+ *
+ * @return input stream, or null if no more entries
+ */
+ private LogRecordWithInputStream nextLogRecord() throws IOException {
+ try {
+ return nextLogRecord(reader);
+ } catch (EndOfStreamException e) {
+ eos = true;
+ LOG.info(()->"end of stream is reached");
+ return null;
+ }
+ }
+
@Override
public int read() throws IOException {
byte[] b = new byte[1];
@@ -120,7 +102,7 @@ public class DLInputStream extends InputStream {
if (currentLogRecord == null) {
currentLogRecord = nextLogRecord();
if (currentLogRecord == null) {
- return read;
+ return -1;
}
}
@@ -129,13 +111,14 @@ public class DLInputStream extends InputStream {
if (thisread == -1) {
currentLogRecord = nextLogRecord();
if (currentLogRecord == null) {
- return read;
+ return read == 0 ? -1 : read;
}
} else {
+ numOfBytesRead += thisread;
read += thisread;
}
}
- return read;
+ return read == 0 ? -1 : read;
}
@Override
@@ -143,4 +126,29 @@ public class DLInputStream extends InputStream {
reader.close();
dlm.close();
}
+
+ // Cache the input stream for a log record.
+ private static class LogRecordWithInputStream {
+ private final InputStream payloadStream;
+ private final LogRecordWithDLSN logRecord;
+
+ LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
+ this.logRecord = logRecord;
+ this.payloadStream = logRecord.getPayLoadInputStream();
+ }
+
+ InputStream getPayLoadInputStream() {
+ return payloadStream;
+ }
+
+ LogRecordWithDLSN getLogRecord() {
+ return logRecord;
+ }
+
+ // The last txid of the log record is the position of the next byte in the stream.
+ // Subtract length to get starting offset.
+ long getOffset() {
+ return logRecord.getTransactionId() - logRecord.getPayload().length;
+ }
+ }
}
diff --git a/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java b/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java
index 41b9cb2..87d3103 100644
--- a/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java
+++ b/heron/io/dlog/src/java/org/apache/heron/dlog/DLOutputStream.java
@@ -21,6 +21,7 @@ package org.apache.heron.dlog;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.logging.Logger;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -29,9 +30,10 @@ import org.apache.distributedlog.api.DistributedLogManager;
* DistributedLog Output Stream.
*/
public class DLOutputStream extends OutputStream {
-
+ private static final Logger LOG = Logger.getLogger(DLOutputStream.class.getName());
private final DistributedLogManager dlm;
private final AppendOnlyStreamWriter writer;
+ private long numOfBytesWritten = 0;
public DLOutputStream(DistributedLogManager dlm,
AppendOnlyStreamWriter writer) {
@@ -41,7 +43,7 @@ public class DLOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
- byte[] data = new byte[] {
+ byte[] data = new byte[]{
(byte) b
};
write(data);
@@ -57,6 +59,8 @@ public class DLOutputStream extends OutputStream {
@Override
public void write(byte[] b) throws IOException {
+ LOG.info(()-> "writing " + b.length + " bytes to output stream");
+ numOfBytesWritten = numOfBytesWritten + b.length;
writer.write(b);
}
@@ -71,4 +75,8 @@ public class DLOutputStream extends OutputStream {
writer.close();
dlm.close();
}
+
+ public long getNumOfBytesWritten() {
+ return numOfBytesWritten;
+ }
}
diff --git a/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java b/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java
index 7925e1e..1f3f675 100644
--- a/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java
+++ b/heron/io/dlog/tests/java/org/apache/heron/dlog/DLInputStreamTest.java
@@ -54,7 +54,7 @@ public class DLInputStreamTest {
byte[] b = new byte[1];
DLInputStream in = new DLInputStream(dlm);
assertEquals("Should return 0 when reading an empty eos stream",
- 0, in.read(b, 0, 1));
+ -1, in.read(b, 0, 1));
assertEquals("Should return -1 when reading an empty eos stream",
-1, in.read(b, 0, 1));
}
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
index e53e45f..b67539d 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/dlog/DlogStorage.java
@@ -145,10 +145,17 @@ public class DlogStorage implements IStatefulStorage {
OutputStream out = null;
try {
out = openOutputStream(checkpointPath);
+ LOG.info(() -> String.format("writing a check point of %d bytes",
+ checkpoint.getCheckpoint().getSerializedSize()));
checkpoint.getCheckpoint().writeTo(out);
+ out.flush();
} catch (IOException e) {
throw new StatefulStorageException("Failed to persist checkpoint @ " + checkpointPath, e);
} finally {
+ if (out != null) {
+ final long num = ((DLOutputStream) out).getNumOfBytesWritten();
+ LOG.info(() -> num + "bytes written");
+ }
SysUtils.closeIgnoringExceptions(out);
}
}
@@ -170,6 +177,10 @@ public class DlogStorage implements IStatefulStorage {
} catch (IOException ioe) {
throw new StatefulStorageException("Failed to read checkpoint from " + checkpointPath, ioe);
} finally {
+ if (in != null) {
+ final long num = ((DLInputStream) in).getNumOfBytesRead();
+ LOG.info(() -> num + " bytes read");
+ }
SysUtils.closeIgnoringExceptions(in);
}
diff --git a/heron/statefulstorages/tests/java/BUILD b/heron/statefulstorages/tests/java/BUILD
index c5671bc..5da7047 100644
--- a/heron/statefulstorages/tests/java/BUILD
+++ b/heron/statefulstorages/tests/java/BUILD
@@ -48,6 +48,7 @@ java_tests(
dlog_deps_files = [
"//heron/statefulstorages/src/java:dlog-statefulstorage-java",
+ "//heron/io/dlog/src/java:dlog-lib",
"@com_google_guava_guava//jar",
"@org_apache_distributedlog_core//jar",
"@io_netty_netty_all//jar",
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
index 6e55c3c..7780165 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/dlog/DlogStorageTest.java
@@ -19,7 +19,6 @@
package org.apache.heron.statefulstorage.dlog;
-import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.heron.dlog.DLInputStream;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.spi.statefulstorage.Checkpoint;
@@ -118,7 +118,7 @@ public class DlogStorageTest {
@Test
public void testRestore() throws Exception {
- InputStream mockInputStream = mock(InputStream.class);
+ DLInputStream mockInputStream = mock(DLInputStream.class);
doReturn(mockInputStream).when(dlogStorage).openInputStream(anyString());
PowerMockito.spy(CheckpointManager.InstanceStateCheckpoint.class);