You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/02/15 15:46:07 UTC

cassandra git commit: Remove pre-3.0 streaming compatibility code for 4.0

Repository: cassandra
Updated Branches:
  refs/heads/trunk 31eac784f -> d81dc27c7


Remove pre-3.0 streaming compatibility code for 4.0

Patch by Paulo Motta; Reviewed by Sylvain Lebresne for CASSANDRA-13081


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d81dc27c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d81dc27c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d81dc27c

Branch: refs/heads/trunk
Commit: d81dc27c7bde7c44a3c00526d803d9d5c7fe2604
Parents: 31eac78
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jan 4 12:04:09 2017 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Wed Feb 15 13:45:53 2017 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReader.java       | 37 ++------------------
 .../compress/CompressedStreamReader.java        | 10 ++----
 3 files changed, 5 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d81dc27c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6efcaa3..0a76400 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
  * Add support for + and - operations on dates (CASSANDRA-11936)
  * Fix consistency of incrementally repaired data (CASSANDRA-9143)
  * Increase commitlog version (CASSANDRA-13161)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d81dc27c/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fdc2ae2..7d00e48 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.RewindableDataInputStreamPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -118,20 +117,14 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            if (deserializer != null)
-                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
-                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName(), e);
+            logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                        session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName(), e);
             if (writer != null)
             {
                 writer.abort(e);
             }
             throw Throwables.propagate(e);
         }
-        finally
-        {
-            if (deserializer != null)
-                deserializer.cleanup();
-        }
     }
 
     protected SerializationHeader getHeader(TableMetadata metadata)
@@ -166,13 +159,6 @@ public class StreamReader
 
     public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
     {
-        public static final int INITIAL_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.initial_mem_buffer_size", 32768);
-        public static final int MAX_MEM_BUFFER_SIZE = Integer.getInteger("cassandra.streamdes.max_mem_buffer_size", 1048576);
-        public static final int MAX_SPILL_FILE_SIZE = Integer.getInteger("cassandra.streamdes.max_spill_file_size", Integer.MAX_VALUE);
-
-        public static final String BUFFER_FILE_PREFIX = "buf";
-        public static final String BUFFER_FILE_SUFFIX = "dat";
-
         private final TableMetadata metadata;
         private final DataInputPlus in;
         private final SerializationHeader header;
@@ -279,24 +265,5 @@ public class StreamReader
         public void close()
         {
         }
-
-        /* We have a separate cleanup method because sometimes close is called before exhausting the
-           StreamDeserializer (for instance, when enclosed in an try-with-resources wrapper, such as in
-           BigTableWriter.append()).
-         */
-        public void cleanup()
-        {
-            if (in instanceof RewindableDataInputStreamPlus)
-            {
-                try
-                {
-                    ((RewindableDataInputStreamPlus) in).close(false);
-                }
-                catch (IOException e)
-                {
-                    logger.warn("Error while closing RewindableDataInputStreamPlus.", e);
-                }
-            }
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d81dc27c/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index da62aa9..3e53fa2 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -110,9 +110,8 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            if (deserializer != null)
-                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
-                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName());
+            logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                        session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getTableName());
             if (writer != null)
             {
                 writer.abort(e);
@@ -121,11 +120,6 @@ public class CompressedStreamReader extends StreamReader
                 throw e;
             throw Throwables.propagate(e);
         }
-        finally
-        {
-            if (deserializer != null)
-                deserializer.cleanup();
-        }
     }
 
     @Override