You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/09/07 09:57:06 UTC

[18/50] [abbrv] lucene-solr:apiv2: SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers.

SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/6bff06ce
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/6bff06ce
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/6bff06ce

Branch: refs/heads/apiv2
Commit: 6bff06ce4fad8edbe2a45e9b3639dfc8d3d2bb87
Parents: 5861a2e
Author: markrmiller <ma...@apache.org>
Authored: Fri Aug 26 13:39:59 2016 -0400
Committer: markrmiller <ma...@apache.org>
Committed: Fri Aug 26 13:39:59 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../apache/solr/update/HdfsTransactionLog.java  | 141 ++++++++++---------
 .../org/apache/solr/update/HdfsUpdateLog.java   |   9 +-
 .../org/apache/solr/update/TransactionLog.java  |   5 +
 .../java/org/apache/solr/update/UpdateLog.java  |   2 +
 5 files changed, 93 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6bff06ce/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 1473fa9..f4bf7a7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -74,6 +74,8 @@ Bug Fixes
 
 * SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch (Pushkar Raste, noble)
 
+* SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers. (Tim Owen via Mark Miller)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6bff06ce/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 7ccbb95..e725127 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -64,7 +64,7 @@ public class HdfsTransactionLog extends TransactionLog {
 
   Path tlogFile;
 
-  
+  private long finalLogSize;
   private FSDataOutputStream tlogOutStream;
   private FileSystem fs;
 
@@ -144,13 +144,8 @@ public class HdfsTransactionLog extends TransactionLog {
 
   @Override
   public boolean endsWithCommit() throws IOException {
-    long size;
-    synchronized (this) {
-      fos.flush();
-      tlogOutStream.hflush();
-      size = fos.size();
-    }
-
+    ensureFlushed();
+    long size = getLogSize();
     
     // the end of the file should have the end message (added during a commit) plus a 4 byte size
     byte[] buf = new byte[ END_MESSAGE.length() ];
@@ -159,11 +154,10 @@ public class HdfsTransactionLog extends TransactionLog {
     
     FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile), pos);
     try {
-    //ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
-    dis.read(buf);
-    for (int i=0; i<buf.length; i++) {
-      if (buf[i] != END_MESSAGE.charAt(i)) return false;
-    }
+      dis.read(buf);
+      for (int i=0; i<buf.length; i++) {
+        if (buf[i] != END_MESSAGE.charAt(i)) return false;
+      }
     } finally {
       dis.close();
     }
@@ -176,10 +170,8 @@ public class HdfsTransactionLog extends TransactionLog {
   public void rollback(long pos) throws IOException {
     synchronized (this) {
       assert snapshot_size == pos;
-      fos.flush();
-      tlogOutStream.hflush();
+      ensureFlushed();
       // TODO: how do we rollback with hdfs?? We need HDFS-3107
-      //raf.setLength(pos);
       fos.setWritten(pos);
       assert fos.size() == pos;
       numRecords = snapshot_numRecords;
@@ -233,8 +225,10 @@ public class HdfsTransactionLog extends TransactionLog {
 
         endRecord(pos);
         
-        fos.flush();  // flush since this will be the last record in a log fill
-        tlogOutStream.hflush();
+        ensureFlushed();  // flush since this will be the last record in a log fill
+
+        // now the commit command is written we will never write to this log again
+        closeOutput();
 
         //assert fos.size() == channel.size();
 
@@ -255,19 +249,7 @@ public class HdfsTransactionLog extends TransactionLog {
 
     try {
       // make sure any unflushed buffer has been flushed
-      synchronized (this) {
-        // TODO: optimize this by keeping track of what we have flushed up to
-        fos.flushBuffer();
-        
-        // flush to hdfs
-        tlogOutStream.hflush();
-        /***
-         System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
-        if (fos.size() != raf.length() || pos >= fos.size() ) {
-          throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
-        }
-        ***/
-      }
+      ensureFlushed();
 
       FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile),
           pos);
@@ -284,6 +266,52 @@ public class HdfsTransactionLog extends TransactionLog {
   }
 
   @Override
+  public void closeOutput() {
+    try {
+      doCloseOutput();
+    } catch (IOException e) {
+      log.error("Could not close tlog output", e);
+      // This situation is not fatal to the caller
+    }
+  }
+
+  private void doCloseOutput() throws IOException {
+    synchronized (this) {
+      if (fos == null) return;
+      if (debug) {
+        log.debug("Closing output for " + tlogFile);
+      }
+      fos.flushBuffer();
+      finalLogSize = fos.size();
+      fos = null;
+    }
+
+    tlogOutStream.hflush();
+    tlogOutStream.close();
+    tlogOutStream = null;
+  }
+
+  private void ensureFlushed() throws IOException {
+    synchronized (this) {
+      if (fos != null) {
+        fos.flush();
+        tlogOutStream.hflush();
+      }
+    }
+  }
+
+  @Override
+  public long getLogSize() {
+    synchronized (this) {
+      if (fos != null) {
+        return fos.size();
+      } else {
+        return finalLogSize;
+      }
+    }
+  }
+
+  @Override
   public void finish(UpdateLog.SyncLevel syncLevel) {
     if (syncLevel == UpdateLog.SyncLevel.NONE) return;
     try {
@@ -309,12 +337,7 @@ public class HdfsTransactionLog extends TransactionLog {
         log.debug("Closing tlog" + this);
       }
 
-      synchronized (this) {
-        fos.flushBuffer();
-      }
-      
-      tlogOutStream.hflush();
-      tlogOutStream.close();
+      doCloseOutput();
 
     } catch (IOException e) {
       log.error("Exception closing tlog.", e);
@@ -359,17 +382,19 @@ public class HdfsTransactionLog extends TransactionLog {
     public HDFSLogReader(long startingPos) {
       super();
       incref();
+      initStream(startingPos);
+    }
+
+    private void initStream(long pos) {
       try {
         
         synchronized (HdfsTransactionLog.this) {
-          fos.flushBuffer();
-          sz = fos.size();
+          ensureFlushed();
+          sz = getLogSize();
         }
-        
-        tlogOutStream.hflush();
-        
+
         FSDataInputStream fdis = fs.open(tlogFile);
-        fis = new FSDataFastInputStream(fdis, startingPos);
+        fis = new FSDataFastInputStream(fdis, pos);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -385,10 +410,10 @@ public class HdfsTransactionLog extends TransactionLog {
 
       synchronized (HdfsTransactionLog.this) {
         if (trace) {
-          log.trace("Reading log record.  pos="+pos+" currentSize="+fos.size());
+          log.trace("Reading log record.  pos="+pos+" currentSize="+getLogSize());
         }
 
-        if (pos >= fos.size()) {
+        if (pos >= getLogSize()) {
           return null;
         }
       }
@@ -398,16 +423,8 @@ public class HdfsTransactionLog extends TransactionLog {
       if (pos >= sz) {
         log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
         
-        synchronized (HdfsTransactionLog.this) {
-          fos.flushBuffer();
-          sz = fos.size();
-        }
-        
-        tlogOutStream.hflush();
         fis.close();
-   
-        FSDataInputStream fdis = fs.open(tlogFile);
-        fis = new FSDataFastInputStream(fdis, pos);
+        initStream(pos);
       }
       
       if (pos == 0) {
@@ -415,7 +432,7 @@ public class HdfsTransactionLog extends TransactionLog {
 
         // shouldn't currently happen - header and first record are currently written at the same time
         synchronized (HdfsTransactionLog.this) {
-          if (fis.position() >= fos.size()) {
+          if (fis.position() >= getLogSize()) {
             return null;
           }
           pos = fis.position();
@@ -443,7 +460,7 @@ public class HdfsTransactionLog extends TransactionLog {
     @Override
     public String toString() {
       synchronized (HdfsTransactionLog.this) {
-        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}";
       }
     }
     
@@ -454,7 +471,7 @@ public class HdfsTransactionLog extends TransactionLog {
     
     @Override
     public long currentSize() {
-      return fos.size();
+      return getLogSize();
     }
 
   }
@@ -478,12 +495,8 @@ public class HdfsTransactionLog extends TransactionLog {
 
       long sz;
       synchronized (HdfsTransactionLog.this) {
-        fos.flushBuffer();
-        
-        // this must be an hflush
-        tlogOutStream.hflush();
-        sz = fos.size();
-        //assert sz == channel.size();
+        ensureFlushed();
+        sz = getLogSize();
       }
 
       fis = new FSDataFastInputStream(fs.open(tlogFile), 0);
@@ -554,7 +567,7 @@ public class HdfsTransactionLog extends TransactionLog {
     @Override
     public String toString() {
       synchronized (HdfsTransactionLog.this) {
-        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
+        return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}";
       }
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6bff06ce/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
index 4cbcf4f..764b099 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java
@@ -219,8 +219,13 @@ public class HdfsUpdateLog extends UpdateLog {
     // It's possible that at abnormal close both "tlog" and "prevTlog" were
     // uncapped.
     for (TransactionLog ll : logs) {
-      newestLogsOnStartup.addFirst(ll);
-      if (newestLogsOnStartup.size() >= 2) break;
+      if (newestLogsOnStartup.size() < 2) {
+        newestLogsOnStartup.addFirst(ll);
+      } else {
+        // We're never going to modify old non-recovery logs - no need to hold their output open
+        log.info("Closing output for old non-recovery log " + ll);
+        ll.closeOutput();
+      }
     }
     
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6bff06ce/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index f7213ed..997485a 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -520,6 +520,11 @@ public class TransactionLog implements Closeable {
     }
   }
 
+  /** Move to a read-only state, closing and releasing resources while keeping the log available for reads */
+  public void closeOutput() {
+
+  }
+
   public void finish(UpdateLog.SyncLevel syncLevel) {
     if (syncLevel == UpdateLog.SyncLevel.NONE) return;
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6bff06ce/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 0b4fc18..5b917b8 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -821,11 +821,13 @@ public class UpdateLog implements PluginInfoInitialized {
 
       try {
         if (ll.endsWithCommit()) {
+          ll.closeOutput();
           ll.decref();
           continue;
         }
       } catch (IOException e) {
         log.error("Error inspecting tlog " + ll, e);
+        ll.closeOutput();
         ll.decref();
         continue;
       }