You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by db...@apache.org on 2013/01/08 06:20:40 UTC

git commit: don't report harmless cache read problem as debug, don't rely on FileInputStream.available to always return > 0 patch by dbrosius reviewed by mkjellman for CASSANDRA-4916

Updated Branches:
  refs/heads/trunk 95c1ed468 -> 1d641f511


don't report harmless cache read problem as debug, don't rely on FileInputStream.available to always return > 0
patch by dbrosius reviewed by mkjellman for CASSANDRA-4916


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d641f51
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d641f51
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d641f51

Branch: refs/heads/trunk
Commit: 1d641f5111613a5a049042a8723d0dd9ffc29c02
Parents: 95c1ed4
Author: Dave Brosius <db...@apache.org>
Authored: Tue Jan 8 00:16:44 2013 -0500
Committer: Dave Brosius <db...@apache.org>
Committed: Tue Jan 8 00:16:44 2013 -0500

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java    |    9 +-
 .../io/util/LengthAvailableInputStream.java        |   98 +++++++++++++++
 2 files changed, 103 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d641f51/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 4e35d34..7e4bb77 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.LengthAvailableInputStream;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
@@ -109,7 +110,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             try
             {
                 logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(new FileInputStream(path)), path.length()));
                 Set<ByteBuffer> keys = new HashSet<ByteBuffer>();
                 while (in.available() > 0)
                 {
@@ -120,7 +121,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
             catch (Exception e)
             {
-                logger.warn(String.format("error reading saved cache %s, keys loaded so far: %d", path.getAbsolutePath(), count), e);
+                logger.debug(String.format("harmless error reading saved cache %s fully, keys loaded so far: %d", path.getAbsolutePath(), count), e);
                 return count;
             }
             finally
@@ -137,7 +138,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             try
             {
                 logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(new FileInputStream(path)), path.length()));
                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>();
                 while (in.available() > 0)
                 {
@@ -157,7 +158,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             }
             catch (Exception e)
             {
-                logger.warn(String.format("error reading saved cache %s", path.getAbsolutePath()), e);
+                logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d641f51/src/java/org/apache/cassandra/io/util/LengthAvailableInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/LengthAvailableInputStream.java b/src/java/org/apache/cassandra/io/util/LengthAvailableInputStream.java
new file mode 100644
index 0000000..a467ce2
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/LengthAvailableInputStream.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * a FilterInputStream that returns the remaining bytes to read from available()
+ * regardless of whether the device is ready to provide them.
+ */
+public class LengthAvailableInputStream extends FilterInputStream
+{
+    private long remainingBytes;
+
+    public LengthAvailableInputStream(InputStream in, long totalLength)
+    {
+        super(in);
+        remainingBytes = totalLength;
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        int b = in.read();
+        --remainingBytes;
+        return b;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException
+    {
+        int length = in.read(b);
+        remainingBytes -= length;
+        return length;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        int length = in.read(b, off, len);
+        remainingBytes -= length;
+        return length;
+    }
+
+    @Override
+    public long skip(long n) throws IOException
+    {
+        long length = in.skip(n);
+        remainingBytes -= length;
+        return length;
+    }
+
+    @Override
+    public int available() throws IOException
+    {
+        return (remainingBytes <= 0) ? 0 : ((remainingBytes > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)remainingBytes);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit)
+    {
+    }
+
+    @Override
+    public synchronized void reset() throws IOException
+    {
+        throw new IOException("Mark/Reset not supported");
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+}