You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2016/08/29 14:04:00 UTC
lucene-solr:branch_6x: SOLR-9389: HDFS Transaction logs stay open for
writes which leaks Xceivers.
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x b3526c568 -> aaee4c820
SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/aaee4c82
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/aaee4c82
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/aaee4c82
Branch: refs/heads/branch_6x
Commit: aaee4c820556ca0f62d51f939e907864e4262a32
Parents: b3526c5
Author: markrmiller <ma...@apache.org>
Authored: Fri Aug 26 13:39:59 2016 -0400
Committer: markrmiller <ma...@apache.org>
Committed: Mon Aug 29 10:03:52 2016 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../apache/solr/update/HdfsTransactionLog.java | 141 ++++++++++---------
.../org/apache/solr/update/HdfsUpdateLog.java | 9 +-
.../org/apache/solr/update/TransactionLog.java | 5 +
.../java/org/apache/solr/update/UpdateLog.java | 2 +
5 files changed, 93 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/aaee4c82/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7c762d2..f227db8 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -51,6 +51,8 @@ Bug Fixes
to be consistent with other places in Solr. Language names still work for backwards
compatibility. (Uwe Schindler, Boris Steiner)
+* SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers. (Tim Owen via Mark Miller)
+
* SOLR-9188: blockUnknown property makes inter-node communication impossible (noble)
Optimizations
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/aaee4c82/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 7ccbb95..e725127 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -64,7 +64,7 @@ public class HdfsTransactionLog extends TransactionLog {
Path tlogFile;
-
+ private long finalLogSize;
private FSDataOutputStream tlogOutStream;
private FileSystem fs;
@@ -144,13 +144,8 @@ public class HdfsTransactionLog extends TransactionLog {
@Override
public boolean endsWithCommit() throws IOException {
- long size;
- synchronized (this) {
- fos.flush();
- tlogOutStream.hflush();
- size = fos.size();
- }
-
+ ensureFlushed();
+ long size = getLogSize();
// the end of the file should have the end message (added during a commit) plus a 4 byte size
byte[] buf = new byte[ END_MESSAGE.length() ];
@@ -159,11 +154,10 @@ public class HdfsTransactionLog extends TransactionLog {
FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile), pos);
try {
- //ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
- dis.read(buf);
- for (int i=0; i<buf.length; i++) {
- if (buf[i] != END_MESSAGE.charAt(i)) return false;
- }
+ dis.read(buf);
+ for (int i=0; i<buf.length; i++) {
+ if (buf[i] != END_MESSAGE.charAt(i)) return false;
+ }
} finally {
dis.close();
}
@@ -176,10 +170,8 @@ public class HdfsTransactionLog extends TransactionLog {
public void rollback(long pos) throws IOException {
synchronized (this) {
assert snapshot_size == pos;
- fos.flush();
- tlogOutStream.hflush();
+ ensureFlushed();
// TODO: how do we rollback with hdfs?? We need HDFS-3107
- //raf.setLength(pos);
fos.setWritten(pos);
assert fos.size() == pos;
numRecords = snapshot_numRecords;
@@ -233,8 +225,10 @@ public class HdfsTransactionLog extends TransactionLog {
endRecord(pos);
- fos.flush(); // flush since this will be the last record in a log fill
- tlogOutStream.hflush();
+ ensureFlushed(); // flush since this will be the last record in a log fill
+
+ // now the commit command is written we will never write to this log again
+ closeOutput();
//assert fos.size() == channel.size();
@@ -255,19 +249,7 @@ public class HdfsTransactionLog extends TransactionLog {
try {
// make sure any unflushed buffer has been flushed
- synchronized (this) {
- // TODO: optimize this by keeping track of what we have flushed up to
- fos.flushBuffer();
-
- // flush to hdfs
- tlogOutStream.hflush();
- /***
- System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
- if (fos.size() != raf.length() || pos >= fos.size() ) {
- throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
- }
- ***/
- }
+ ensureFlushed();
FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile),
pos);
@@ -284,6 +266,52 @@ public class HdfsTransactionLog extends TransactionLog {
}
@Override
+ public void closeOutput() {
+ try {
+ doCloseOutput();
+ } catch (IOException e) {
+ log.error("Could not close tlog output", e);
+ // This situation is not fatal to the caller
+ }
+ }
+
+ private void doCloseOutput() throws IOException {
+ synchronized (this) {
+ if (fos == null) return;
+ if (debug) {
+ log.debug("Closing output for " + tlogFile);
+ }
+ fos.flushBuffer();
+ finalLogSize = fos.size();
+ fos = null;
+ }
+
+ tlogOutStream.hflush();
+ tlogOutStream.close();
+ tlogOutStream = null;
+ }
+
+ private void ensureFlushed() throws IOException {
+ synchronized (this) {
+ if (fos != null) {
+ fos.flush();
+ tlogOutStream.hflush();
+ }
+ }
+ }
+
+ @Override
+ public long getLogSize() {
+ synchronized (this) {
+ if (fos != null) {
+ return fos.size();
+ } else {
+ return finalLogSize;
+ }
+ }
+ }
+
+ @Override
public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try {
@@ -309,12 +337,7 @@ public class HdfsTransactionLog extends TransactionLog {
log.debug("Closing tlog" + this);
}
- synchronized (this) {
- fos.flushBuffer();
- }
-
- tlogOutStream.hflush();
- tlogOutStream.close();
+ doCloseOutput();
} catch (IOException e) {
log.error("Exception closing tlog.", e);
@@ -359,17 +382,19 @@ public class HdfsTransactionLog extends TransactionLog {
public HDFSLogReader(long startingPos) {
super();
incref();
+ initStream(startingPos);
+ }
+
+ private void initStream(long pos) {
try {
synchronized (HdfsTransactionLog.this) {
- fos.flushBuffer();
- sz = fos.size();
+ ensureFlushed();
+ sz = getLogSize();
}
-
- tlogOutStream.hflush();
-
+
FSDataInputStream fdis = fs.open(tlogFile);
- fis = new FSDataFastInputStream(fdis, startingPos);
+ fis = new FSDataFastInputStream(fdis, pos);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -385,10 +410,10 @@ public class HdfsTransactionLog extends TransactionLog {
synchronized (HdfsTransactionLog.this) {
if (trace) {
- log.trace("Reading log record. pos="+pos+" currentSize="+fos.size());
+ log.trace("Reading log record. pos="+pos+" currentSize="+getLogSize());
}
- if (pos >= fos.size()) {
+ if (pos >= getLogSize()) {
return null;
}
}
@@ -398,16 +423,8 @@ public class HdfsTransactionLog extends TransactionLog {
if (pos >= sz) {
log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
- synchronized (HdfsTransactionLog.this) {
- fos.flushBuffer();
- sz = fos.size();
- }
-
- tlogOutStream.hflush();
fis.close();
-
- FSDataInputStream fdis = fs.open(tlogFile);
- fis = new FSDataFastInputStream(fdis, pos);
+ initStream(pos);
}
if (pos == 0) {
@@ -415,7 +432,7 @@ public class HdfsTransactionLog extends TransactionLog {
// shouldn't currently happen - header and first record are currently written at the same time
synchronized (HdfsTransactionLog.this) {
- if (fis.position() >= fos.size()) {
+ if (fis.position() >= getLogSize()) {
return null;
}
pos = fis.position();
@@ -443,7 +460,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override
public String toString() {
synchronized (HdfsTransactionLog.this) {
- return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+ return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}";
}
}
@@ -454,7 +471,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override
public long currentSize() {
- return fos.size();
+ return getLogSize();
}
}
@@ -478,12 +495,8 @@ public class HdfsTransactionLog extends TransactionLog {
long sz;
synchronized (HdfsTransactionLog.this) {
- fos.flushBuffer();
-
- // this must be an hflush
- tlogOutStream.hflush();
- sz = fos.size();
- //assert sz == channel.size();
+ ensureFlushed();
+ sz = getLogSize();
}
fis = new FSDataFastInputStream(fs.open(tlogFile), 0);
@@ -554,7 +567,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override
public String toString() {
synchronized (HdfsTransactionLog.this) {
- return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+ return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}";
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/aaee4c82/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
index 4cbcf4f..764b099 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
@@ -219,8 +219,13 @@ public class HdfsUpdateLog extends UpdateLog {
// It's possible that at abnormal close both "tlog" and "prevTlog" were
// uncapped.
for (TransactionLog ll : logs) {
- newestLogsOnStartup.addFirst(ll);
- if (newestLogsOnStartup.size() >= 2) break;
+ if (newestLogsOnStartup.size() < 2) {
+ newestLogsOnStartup.addFirst(ll);
+ } else {
+ // We're never going to modify old non-recovery logs - no need to hold their output open
+ log.info("Closing output for old non-recovery log " + ll);
+ ll.closeOutput();
+ }
}
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/aaee4c82/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index f7213ed..997485a 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -520,6 +520,11 @@ public class TransactionLog implements Closeable {
}
}
+ /** Move to a read-only state, closing and releasing resources while keeping the log available for reads */
+ public void closeOutput() {
+
+ }
+
public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/aaee4c82/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 0b4fc18..5b917b8 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -821,11 +821,13 @@ public class UpdateLog implements PluginInfoInitialized {
try {
if (ll.endsWithCommit()) {
+ ll.closeOutput();
ll.decref();
continue;
}
} catch (IOException e) {
log.error("Error inspecting tlog " + ll, e);
+ ll.closeOutput();
ll.decref();
continue;
}