You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/12/19 19:34:20 UTC
[accumulo] branch master updated: Stop creating double OuputStreams
for WAL (#829)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 2297c0c Stop creating double OuputStreams for WAL (#829)
2297c0c is described below
commit 2297c0c3d03c49491dc1c2e82086572c136aa014
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Dec 19 14:34:16 2018 -0500
Stop creating double OuputStreams for WAL (#829)
---
.../core/crypto/streams/NoFlushOutputStream.java | 4 ++--
.../org/apache/accumulo/tserver/log/DfsLogger.java | 21 +++++++++++----------
2 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/crypto/streams/NoFlushOutputStream.java b/core/src/main/java/org/apache/accumulo/core/crypto/streams/NoFlushOutputStream.java
index a8b6901..cea7798 100644
--- a/core/src/main/java/org/apache/accumulo/core/crypto/streams/NoFlushOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/crypto/streams/NoFlushOutputStream.java
@@ -16,11 +16,11 @@
*/
package org.apache.accumulo.core.crypto.streams;
-import java.io.FilterOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-public class NoFlushOutputStream extends FilterOutputStream {
+public class NoFlushOutputStream extends DataOutputStream {
public NoFlushOutputStream(OutputStream out) {
super(out);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 522cc70..d7cd7eb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.crypto.CryptoUtils;
import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
import org.apache.accumulo.core.cryptoImpl.CryptoEnvironmentImpl;
import org.apache.accumulo.core.cryptoImpl.NoCryptoService;
+import org.apache.accumulo.core.cryptoImpl.NoFileEncrypter;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
@@ -130,17 +131,9 @@ public class DfsLogger implements Comparable<DfsLogger> {
return originalInput;
}
- public void setOriginalInput(FSDataInputStream originalInput) {
- this.originalInput = originalInput;
- }
-
public DataInputStream getDecryptingInputStream() {
return decryptingInputStream;
}
-
- public void setDecryptingInputStream(DataInputStream decryptingInputStream) {
- this.decryptingInputStream = decryptingInputStream;
- }
}
public interface ServerResources {
@@ -443,8 +436,16 @@ public class DfsLogger implements Comparable<DfsLogger> {
byte[] cryptoParams = encrypter.getDecryptionParameters();
CryptoUtils.writeParams(cryptoParams, logFile);
- encryptingLogFile = new DataOutputStream(
- encrypter.encryptStream(new NoFlushOutputStream(logFile)));
+ /** Always wrap the WAL in a NoFlushOutputStream to prevent extra flushing to HDFS.
+ * The {@link #write(LogFileKey, LogFileValue)} method will flush crypto data or do nothing
+ * when crypto is not enabled.
+ **/
+ OutputStream encryptedStream = encrypter.encryptStream(new NoFlushOutputStream(logFile));
+ if (encryptedStream instanceof NoFlushOutputStream) {
+ encryptingLogFile = (NoFlushOutputStream)encryptedStream;
+ } else {
+ encryptingLogFile = new DataOutputStream(encryptedStream);
+ }
LogFileKey key = new LogFileKey();
key.event = OPEN;