You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/24 00:07:03 UTC

svn commit: r915575 [2/3] - in /incubator/cassandra/trunk: ./ contrib/word_count/src/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/auth/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/cache/ sr...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java Tue Feb 23 23:07:01 2010
@@ -1,73 +1,94 @@
-package org.apache.cassandra.dht;
-
-import java.util.*;
-
-import org.apache.cassandra.service.StorageService;
-
-public class Bounds extends AbstractBounds
-{
-    public Bounds(Token left, Token right)
-    {
-        this(left, right, StorageService.getPartitioner());
-    }
-
-    Bounds(Token left, Token right, IPartitioner partitioner)
-    {
-        super(left, right, partitioner);
-        // unlike a Range, a Bounds may not wrap
-        assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
-    }
-
-    @Override
-    public boolean contains(Token token)
-    {
-        return Range.contains(left, right, token) || left.equals(token);
-    }
-
-    public Set<AbstractBounds> restrictTo(Range range)
-    {
-        Token min = partitioner.getMinimumToken();
-
-        // special case Bounds where left=right (single Token)
-        if (this.left.equals(this.right) && !this.right.equals(min))
-            return range.contains(this.left)
-                   ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
-                   : Collections.<AbstractBounds>emptySet();
-
-        // get the intersection of a Range w/ same left & right
-        Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
-        // if range doesn't contain left token anyway, that's the correct answer
-        if (!range.contains(this.left))
-            return (Set) ranges;
-        // otherwise, add back in the left token
-        Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
-        for (Range restricted : ranges)
-        {
-            if (restricted.left.equals(this.left))
-                S.add(new Bounds(restricted.left, restricted.right));
-            else
-                S.add(restricted);
-        }
-        return Collections.unmodifiableSet(S);
-    }
-
-    public List<AbstractBounds> unwrap()
-    {
-        // Bounds objects never wrap
-        return (List)Arrays.asList(this);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (!(o instanceof Bounds))
-            return false;
-        Bounds rhs = (Bounds)o;
-        return left.equals(rhs.left) && right.equals(rhs.right);
-    }
-
-    public String toString()
-    {
-        return "[" + left + "," + right + "]";
-    }
-}
+package org.apache.cassandra.dht;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.*;
+
+import org.apache.cassandra.service.StorageService;
+
+public class Bounds extends AbstractBounds
+{
+    public Bounds(Token left, Token right)
+    {
+        this(left, right, StorageService.getPartitioner());
+    }
+
+    Bounds(Token left, Token right, IPartitioner partitioner)
+    {
+        super(left, right, partitioner);
+        // unlike a Range, a Bounds may not wrap
+        assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
+    }
+
+    @Override
+    public boolean contains(Token token)
+    {
+        return Range.contains(left, right, token) || left.equals(token);
+    }
+
+    public Set<AbstractBounds> restrictTo(Range range)
+    {
+        Token min = partitioner.getMinimumToken();
+
+        // special case Bounds where left=right (single Token)
+        if (this.left.equals(this.right) && !this.right.equals(min))
+            return range.contains(this.left)
+                   ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
+                   : Collections.<AbstractBounds>emptySet();
+
+        // get the intersection of a Range w/ same left & right
+        Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
+        // if range doesn't contain left token anyway, that's the correct answer
+        if (!range.contains(this.left))
+            return (Set) ranges;
+        // otherwise, add back in the left token
+        Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
+        for (Range restricted : ranges)
+        {
+            if (restricted.left.equals(this.left))
+                S.add(new Bounds(restricted.left, restricted.right));
+            else
+                S.add(restricted);
+        }
+        return Collections.unmodifiableSet(S);
+    }
+
+    public List<AbstractBounds> unwrap()
+    {
+        // Bounds objects never wrap
+        return (List)Arrays.asList(this);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof Bounds))
+            return false;
+        Bounds rhs = (Bounds)o;
+        return left.equals(rhs.left) && right.equals(rhs.right);
+    }
+
+    public String toString()
+    {
+        return "[" + left + "," + right + "]";
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.hadoop;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -226,4 +247,4 @@
     {
         return new ColumnFamilyRecordReader();
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.hadoop;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -188,4 +209,4 @@
     {
         return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.hadoop;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -132,4 +153,4 @@
         w.readFields(in);
         return w;
     }
-}
\ No newline at end of file
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java Tue Feb 23 23:07:01 2010
@@ -1,67 +1,88 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class DeletionService
-{
-    public static final int MAX_RETRIES = 10;
-
-    public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
-
-    public static void submitDelete(final String file)
-    {
-        Runnable deleter = new WrappedRunnable()
-        {
-            @Override
-            protected void runMayThrow() throws IOException
-            {
-                FileUtils.deleteWithConfirm(new File(file));
-            }
-        };
-        executor.submit(deleter);
-    }
-
-    public static void submitDeleteWithRetry(String file)
-    {
-        submitDeleteWithRetry(file, 0);
-    }
-
-    private static void submitDeleteWithRetry(final String file, final int retryCount)
-    {
-        Runnable deleter = new WrappedRunnable()
-        {
-            @Override
-            protected void runMayThrow() throws IOException
-            {
-                if (!new File(file).delete())
-                {
-                    if (retryCount > MAX_RETRIES)
-                        throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
-                    new Thread(new Runnable()
-                    {
-                        public void run()
-                        {
-                            try
-                            {
-                                Thread.sleep(10000);
-                            }
-                            catch (InterruptedException e)
-                            {
-                                throw new AssertionError(e);
-                            }
-                            submitDeleteWithRetry(file, retryCount + 1);
-                        }
-                    }, "Delete submission: " + file).start();
-                }
-            }
-        };
-        executor.submit(deleter);
-    }
-}
+package org.apache.cassandra.io;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DeletionService
+{
+    public static final int MAX_RETRIES = 10;
+
+    public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
+
+    public static void submitDelete(final String file)
+    {
+        Runnable deleter = new WrappedRunnable()
+        {
+            @Override
+            protected void runMayThrow() throws IOException
+            {
+                FileUtils.deleteWithConfirm(new File(file));
+            }
+        };
+        executor.submit(deleter);
+    }
+
+    public static void submitDeleteWithRetry(String file)
+    {
+        submitDeleteWithRetry(file, 0);
+    }
+
+    private static void submitDeleteWithRetry(final String file, final int retryCount)
+    {
+        Runnable deleter = new WrappedRunnable()
+        {
+            @Override
+            protected void runMayThrow() throws IOException
+            {
+                if (!new File(file).delete())
+                {
+                    if (retryCount > MAX_RETRIES)
+                        throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
+                    new Thread(new Runnable()
+                    {
+                        public void run()
+                        {
+                            try
+                            {
+                                Thread.sleep(10000);
+                            }
+                            catch (InterruptedException e)
+                            {
+                                throw new AssertionError(e);
+                            }
+                            submitDeleteWithRetry(file, retryCount + 1);
+                        }
+                    }, "Delete submission: " + file).start();
+                }
+            }
+        };
+        executor.submit(deleter);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.io;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.File;
 import java.io.IOError;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java Tue Feb 23 23:07:01 2010
@@ -1,18 +1,39 @@
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Closeable;
-
-public interface FileDataInput extends DataInput, Closeable
-{
-    public String getPath();
-
-    public boolean isEOF() throws IOException;
-
-    public void mark();
-
-    public void reset() throws IOException;
-
-    public int bytesPastMark();
-}
+package org.apache.cassandra.io.util;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.Closeable;
+
+public interface FileDataInput extends DataInput, Closeable
+{
+    public String getPath();
+
+    public boolean isEOF() throws IOException;
+
+    public void mark();
+
+    public void reset() throws IOException;
+
+    public int bytesPastMark();
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Tue Feb 23 23:07:01 2010
@@ -1,404 +1,425 @@
-package org.apache.cassandra.io.util;
-
-import java.nio.MappedByteBuffer;
-import java.io.*;
-
-public class MappedFileDataInput extends InputStream implements FileDataInput
-{
-    private final MappedByteBuffer buffer;
-    private final String filename;
-    private int position;
-    private int markedPosition;
-
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename)
-    {
-        this(buffer, filename, 0);
-    }
-
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
-    {
-        assert buffer != null;
-        this.buffer = buffer;
-        this.filename = filename;
-        this.position = position;
-    }
-
-    // don't make this public, this is only for seeking WITHIN the current mapped segment
-    private void seekInternal(int pos) throws IOException
-    {
-        position = pos;
-    }
-
-    @Override
-    public boolean markSupported()
-    {
-        return true;
-    }
-
-    @Override
-    public void mark(int ignored)
-    {
-        markedPosition = position;
-    }
-
-    @Override
-    public void reset() throws IOException
-    {
-        seekInternal(markedPosition);
-    }
-
-    public void mark()
-    {
-        mark(-1);
-    }
-
-    public int bytesPastMark()
-    {
-        assert position >= markedPosition;
-        return position - markedPosition;
-    }
-
-    public boolean isEOF() throws IOException
-    {
-        return position == buffer.capacity();
-    }
-
-    public String getPath()
-    {
-        return filename;
-    }
-
-    public int read() throws IOException
-    {
-        if (isEOF())
-            return -1;
-        return buffer.get(position++) & 0xFF;
-    }
-
-    public int skipBytes(int n) throws IOException
-    {
-        if (n <= 0)
-            return 0;
-        int oldPosition = position;
-        assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
-        position = Math.min(buffer.capacity(), position + n);
-        return position - oldPosition;
-    }
-
-    /*
-     !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
-     */
-
-    /**
-     * Reads a boolean from the current position in this file. Blocks until one
-     * byte has been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next boolean value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final boolean readBoolean() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return temp != 0;
-    }
-
-    /**
-     * Reads an 8-bit byte from the current position in this file. Blocks until
-     * one byte has been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next signed 8-bit byte value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final byte readByte() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return (byte) temp;
-    }
-
-    /**
-     * Reads a 16-bit character from the current position in this file. Blocks until
-     * two bytes have been read, the end of the file is reached or an exception is
-     * thrown.
-     *
-     * @return the next char value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final char readChar() throws IOException {
-        byte[] buffer = new byte[2];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return (char) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
-    }
-
-    /**
-     * Reads a 64-bit double from the current position in this file. Blocks
-     * until eight bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next double value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final double readDouble() throws IOException {
-        return Double.longBitsToDouble(readLong());
-    }
-
-    /**
-     * Reads a 32-bit float from the current position in this file. Blocks
-     * until four bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next float value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final float readFloat() throws IOException {
-        return Float.intBitsToFloat(readInt());
-    }
-
-    /**
-     * Reads bytes from this file into {@code buffer}. Blocks until {@code
-     * buffer.length} number of bytes have been read, the end of the file is
-     * reached or an exception is thrown.
-     *
-     * @param buffer
-     *            the buffer to read bytes into.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     */
-    public final void readFully(byte[] buffer) throws IOException {
-        readFully(buffer, 0, buffer.length);
-    }
-
-    /**
-     * Read bytes from this file into {@code buffer} starting at offset {@code
-     * offset}. This method blocks until {@code count} number of bytes have been
-     * read.
-     *
-     * @param buffer
-     *            the buffer to read bytes into.
-     * @param offset
-     *            the initial position in {@code buffer} to store the bytes read
-     *            from this file.
-     * @param count
-     *            the maximum number of bytes to store in {@code buffer}.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IndexOutOfBoundsException
-     *             if {@code offset < 0} or {@code count < 0}, or if {@code
-     *             offset + count} is greater than the length of {@code buffer}.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     */
-    public final void readFully(byte[] buffer, int offset, int count)
-            throws IOException {
-        if (buffer == null) {
-            throw new NullPointerException();
-        }
-        // avoid int overflow
-        if (offset < 0 || offset > buffer.length || count < 0
-                || count > buffer.length - offset) {
-            throw new IndexOutOfBoundsException();
-        }
-        while (count > 0) {
-            int result = read(buffer, offset, count);
-            if (result < 0) {
-                throw new EOFException();
-            }
-            offset += result;
-            count -= result;
-        }
-    }
-
-    /**
-     * Reads a 32-bit integer from the current position in this file. Blocks
-     * until four bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next int value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readInt() throws IOException {
-        byte[] buffer = new byte[4];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
-                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
-    }
-
-    /**
-     * Reads a line of text form the current position in this file. A line is
-     * represented by zero or more characters followed by {@code '\n'}, {@code
-     * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
-     * include the line terminating sequence.
-     * <p>
-     * Blocks until a line terminating sequence has been read, the end of the
-     * file is reached or an exception is thrown.
-     *
-     * @return the contents of the line or {@code null} if no characters have
-     *         been read before the end of the file has been reached.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final String readLine() throws IOException {
-        StringBuilder line = new StringBuilder(80); // Typical line length
-        boolean foundTerminator = false;
-        int unreadPosition = 0;
-        while (true) {
-            int nextByte = read();
-            switch (nextByte) {
-                case -1:
-                    return line.length() != 0 ? line.toString() : null;
-                case (byte) '\r':
-                    if (foundTerminator) {
-                        seekInternal(unreadPosition);
-                        return line.toString();
-                    }
-                    foundTerminator = true;
-                    /* Have to be able to peek ahead one byte */
-                    unreadPosition = position;
-                    break;
-                case (byte) '\n':
-                    return line.toString();
-                default:
-                    if (foundTerminator) {
-                        seekInternal(unreadPosition);
-                        return line.toString();
-                    }
-                    line.append((char) nextByte);
-            }
-        }
-    }
-
-    /**
-     * Reads a 64-bit long from the current position in this file. Blocks until
-     * eight bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next long value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final long readLong() throws IOException {
-        byte[] buffer = new byte[8];
-        int n = read(buffer, 0, buffer.length);
-        if (n != buffer.length) {
-            throw new EOFException("expected 8 bytes; read " + n + " at final position " + position);
-        }
-        return ((long) (((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
-                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff)) << 32)
-                + ((long) (buffer[4] & 0xff) << 24)
-                + ((buffer[5] & 0xff) << 16)
-                + ((buffer[6] & 0xff) << 8)
-                + (buffer[7] & 0xff);
-    }
-
-    /**
-     * Reads a 16-bit short from the current position in this file. Blocks until
-     * two bytes have been read, the end of the file is reached or an exception
-     * is thrown.
-     *
-     * @return the next short value from this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final short readShort() throws IOException {
-        byte[] buffer = new byte[2];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return (short) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
-    }
-
-    /**
-     * Reads an unsigned 8-bit byte from the current position in this file and
-     * returns it as an integer. Blocks until one byte has been read, the end of
-     * the file is reached or an exception is thrown.
-     *
-     * @return the next unsigned byte value from this file as an int.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readUnsignedByte() throws IOException {
-        int temp = this.read();
-        if (temp < 0) {
-            throw new EOFException();
-        }
-        return temp;
-    }
-
-    /**
-     * Reads an unsigned 16-bit short from the current position in this file and
-     * returns it as an integer. Blocks until two bytes have been read, the end of
-     * the file is reached or an exception is thrown.
-     *
-     * @return the next unsigned short value from this file as an int.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     */
-    public final int readUnsignedShort() throws IOException {
-        byte[] buffer = new byte[2];
-        if (read(buffer, 0, buffer.length) != buffer.length) {
-            throw new EOFException();
-        }
-        return ((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff);
-    }
-
-    /**
-     * Reads a string that is encoded in {@link DataInput modified UTF-8} from
-     * this file. The number of bytes that must be read for the complete string
-     * is determined by the first two bytes read from the file. Blocks until all
-     * required bytes have been read, the end of the file is reached or an
-     * exception is thrown.
-     *
-     * @return the next string encoded in {@link DataInput modified UTF-8} from
-     *         this file.
-     * @throws EOFException
-     *             if the end of this file is detected.
-     * @throws IOException
-     *             if this file is closed or another I/O error occurs.
-     * @throws UTFDataFormatException
-     *             if the bytes read cannot be decoded into a character string.
-     */
-    public final String readUTF() throws IOException {
-        return DataInputStream.readUTF(this);
-    }
-}
+package org.apache.cassandra.io.util;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.nio.MappedByteBuffer;
+import java.io.*;
+
+public class MappedFileDataInput extends InputStream implements FileDataInput
+{
+    private final MappedByteBuffer buffer;
+    private final String filename;
+    private int position;
+    private int markedPosition;
+
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename)
+    {
+        this(buffer, filename, 0);
+    }
+
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
+    {
+        assert buffer != null;
+        this.buffer = buffer;
+        this.filename = filename;
+        this.position = position;
+    }
+
+    // don't make this public, this is only for seeking WITHIN the current mapped segment
+    private void seekInternal(int pos) throws IOException
+    {
+        position = pos;
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return true;
+    }
+
+    @Override
+    public void mark(int ignored)
+    {
+        markedPosition = position;
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        seekInternal(markedPosition);
+    }
+
+    public void mark()
+    {
+        mark(-1);
+    }
+
+    public int bytesPastMark()
+    {
+        assert position >= markedPosition;
+        return position - markedPosition;
+    }
+
+    public boolean isEOF() throws IOException
+    {
+        return position == buffer.capacity();
+    }
+
+    public String getPath()
+    {
+        return filename;
+    }
+
+    public int read() throws IOException
+    {
+        if (isEOF())
+            return -1;
+        return buffer.get(position++) & 0xFF;
+    }
+
+    public int skipBytes(int n) throws IOException
+    {
+        if (n <= 0)
+            return 0;
+        int oldPosition = position;
+        assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
+        position = Math.min(buffer.capacity(), position + n);
+        return position - oldPosition;
+    }
+
+    /*
+     !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+     */
+
+    /**
+     * Reads a boolean from the current position in this file. Blocks until one
+     * byte has been read, the end of the file is reached or an exception is
+     * thrown.
+     *
+     * @return the next boolean value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final boolean readBoolean() throws IOException {
+        int temp = this.read();
+        if (temp < 0) {
+            throw new EOFException();
+        }
+        return temp != 0;
+    }
+
+    /**
+     * Reads an 8-bit byte from the current position in this file. Blocks until
+     * one byte has been read, the end of the file is reached or an exception is
+     * thrown.
+     *
+     * @return the next signed 8-bit byte value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final byte readByte() throws IOException {
+        int temp = this.read();
+        if (temp < 0) {
+            throw new EOFException();
+        }
+        return (byte) temp;
+    }
+
+    /**
+     * Reads a 16-bit character from the current position in this file. Blocks until
+     * two bytes have been read, the end of the file is reached or an exception is
+     * thrown.
+     *
+     * @return the next char value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final char readChar() throws IOException {
+        byte[] buffer = new byte[2];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return (char) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
+    }
+
+    /**
+     * Reads a 64-bit double from the current position in this file. Blocks
+     * until eight bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next double value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final double readDouble() throws IOException {
+        return Double.longBitsToDouble(readLong());
+    }
+
+    /**
+     * Reads a 32-bit float from the current position in this file. Blocks
+     * until four bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next float value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final float readFloat() throws IOException {
+        return Float.intBitsToFloat(readInt());
+    }
+
+    /**
+     * Reads bytes from this file into {@code buffer}. Blocks until {@code
+     * buffer.length} number of bytes have been read, the end of the file is
+     * reached or an exception is thrown.
+     *
+     * @param buffer
+     *            the buffer to read bytes into.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     * @throws NullPointerException
+     *             if {@code buffer} is {@code null}.
+     */
+    public final void readFully(byte[] buffer) throws IOException {
+        readFully(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Read bytes from this file into {@code buffer} starting at offset {@code
+     * offset}. This method blocks until {@code count} number of bytes have been
+     * read.
+     *
+     * @param buffer
+     *            the buffer to read bytes into.
+     * @param offset
+     *            the initial position in {@code buffer} to store the bytes read
+     *            from this file.
+     * @param count
+     *            the maximum number of bytes to store in {@code buffer}.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IndexOutOfBoundsException
+     *             if {@code offset < 0} or {@code count < 0}, or if {@code
+     *             offset + count} is greater than the length of {@code buffer}.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     * @throws NullPointerException
+     *             if {@code buffer} is {@code null}.
+     */
+    public final void readFully(byte[] buffer, int offset, int count)
+            throws IOException {
+        if (buffer == null) {
+            throw new NullPointerException();
+        }
+        // avoid int overflow
+        if (offset < 0 || offset > buffer.length || count < 0
+                || count > buffer.length - offset) {
+            throw new IndexOutOfBoundsException();
+        }
+        while (count > 0) {
+            int result = read(buffer, offset, count);
+            if (result < 0) {
+                throw new EOFException();
+            }
+            offset += result;
+            count -= result;
+        }
+    }
+
+    /**
+     * Reads a 32-bit integer from the current position in this file. Blocks
+     * until four bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next int value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final int readInt() throws IOException {
+        byte[] buffer = new byte[4];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
+                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
+    }
+
+    /**
+     * Reads a line of text form the current position in this file. A line is
+     * represented by zero or more characters followed by {@code '\n'}, {@code
+     * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
+     * include the line terminating sequence.
+     * <p>
+     * Blocks until a line terminating sequence has been read, the end of the
+     * file is reached or an exception is thrown.
+     *
+     * @return the contents of the line or {@code null} if no characters have
+     *         been read before the end of the file has been reached.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final String readLine() throws IOException {
+        StringBuilder line = new StringBuilder(80); // Typical line length
+        boolean foundTerminator = false;
+        int unreadPosition = 0;
+        while (true) {
+            int nextByte = read();
+            switch (nextByte) {
+                case -1:
+                    return line.length() != 0 ? line.toString() : null;
+                case (byte) '\r':
+                    if (foundTerminator) {
+                        seekInternal(unreadPosition);
+                        return line.toString();
+                    }
+                    foundTerminator = true;
+                    /* Have to be able to peek ahead one byte */
+                    unreadPosition = position;
+                    break;
+                case (byte) '\n':
+                    return line.toString();
+                default:
+                    if (foundTerminator) {
+                        seekInternal(unreadPosition);
+                        return line.toString();
+                    }
+                    line.append((char) nextByte);
+            }
+        }
+    }
+
+    /**
+     * Reads a 64-bit long from the current position in this file. Blocks until
+     * eight bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next long value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final long readLong() throws IOException {
+        byte[] buffer = new byte[8];
+        int n = read(buffer, 0, buffer.length);
+        if (n != buffer.length) {
+            throw new EOFException("expected 8 bytes; read " + n + " at final position " + position);
+        }
+        return ((long) (((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
+                + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff)) << 32)
+                + ((long) (buffer[4] & 0xff) << 24)
+                + ((buffer[5] & 0xff) << 16)
+                + ((buffer[6] & 0xff) << 8)
+                + (buffer[7] & 0xff);
+    }
+
+    /**
+     * Reads a 16-bit short from the current position in this file. Blocks until
+     * two bytes have been read, the end of the file is reached or an exception
+     * is thrown.
+     *
+     * @return the next short value from this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final short readShort() throws IOException {
+        byte[] buffer = new byte[2];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return (short) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
+    }
+
+    /**
+     * Reads an unsigned 8-bit byte from the current position in this file and
+     * returns it as an integer. Blocks until one byte has been read, the end of
+     * the file is reached or an exception is thrown.
+     *
+     * @return the next unsigned byte value from this file as an int.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final int readUnsignedByte() throws IOException {
+        int temp = this.read();
+        if (temp < 0) {
+            throw new EOFException();
+        }
+        return temp;
+    }
+
+    /**
+     * Reads an unsigned 16-bit short from the current position in this file and
+     * returns it as an integer. Blocks until two bytes have been read, the end of
+     * the file is reached or an exception is thrown.
+     *
+     * @return the next unsigned short value from this file as an int.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     */
+    public final int readUnsignedShort() throws IOException {
+        byte[] buffer = new byte[2];
+        if (read(buffer, 0, buffer.length) != buffer.length) {
+            throw new EOFException();
+        }
+        return ((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff);
+    }
+
+    /**
+     * Reads a string that is encoded in {@link DataInput modified UTF-8} from
+     * this file. The number of bytes that must be read for the complete string
+     * is determined by the first two bytes read from the file. Blocks until all
+     * required bytes have been read, the end of the file is reached or an
+     * exception is thrown.
+     *
+     * @return the next string encoded in {@link DataInput modified UTF-8} from
+     *         this file.
+     * @throws EOFException
+     *             if the end of this file is detected.
+     * @throws IOException
+     *             if this file is closed or another I/O error occurs.
+     * @throws UTFDataFormatException
+     *             if the bytes read cannot be decoded into a character string.
+     */
+    public final String readUTF() throws IOException {
+        return DataInputStream.readUTF(this);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Feb 23 23:07:01 2010
@@ -1,69 +1,90 @@
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.Socket;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.streaming.IncomingStreamReader;
-
-public class IncomingTcpConnection extends Thread
-{
-    private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
-
-    private final DataInputStream input;
-    private Socket socket;
-
-    public IncomingTcpConnection(Socket socket)
-    {
-        this.socket = socket;
-        try
-        {
-            input = new DataInputStream(socket.getInputStream());
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
-    @Override
-    public void run()
-    {
-        while (true)
-        {
-            try
-            {
-                MessagingService.validateMagic(input.readInt());
-                int header = input.readInt();
-                int type = MessagingService.getBits(header, 1, 2);
-                boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
-                int version = MessagingService.getBits(header, 15, 8);
-
-                if (isStream)
-                {
-                    new IncomingStreamReader(socket.getChannel()).read();
-                }
-                else
-                {
-                    int size = input.readInt();
-                    byte[] contentBytes = new byte[size];
-                    input.readFully(contentBytes);
-                    MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
-                }
-            }
-            catch (EOFException e)
-            {
-                if (logger.isTraceEnabled())
-                    logger.trace("eof reading from socket; closing", e);
-                break;
-            }
-            catch (IOException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("error reading from socket; closing", e);
-                break;
-            }
-        }
-    }
-}
+package org.apache.cassandra.net;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.*;
+import java.net.Socket;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.streaming.IncomingStreamReader;
+
+public class IncomingTcpConnection extends Thread
+{
+    private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
+
+    private final DataInputStream input;
+    private Socket socket;
+
+    public IncomingTcpConnection(Socket socket)
+    {
+        this.socket = socket;
+        try
+        {
+            input = new DataInputStream(socket.getInputStream());
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        while (true)
+        {
+            try
+            {
+                MessagingService.validateMagic(input.readInt());
+                int header = input.readInt();
+                int type = MessagingService.getBits(header, 1, 2);
+                boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                int version = MessagingService.getBits(header, 15, 8);
+
+                if (isStream)
+                {
+                    new IncomingStreamReader(socket.getChannel()).read();
+                }
+                else
+                {
+                    int size = input.readInt();
+                    byte[] contentBytes = new byte[size];
+                    input.readFully(contentBytes);
+                    MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+                }
+            }
+            catch (EOFException e)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("eof reading from socket; closing", e);
+                break;
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("error reading from socket; closing", e);
+                break;
+            }
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.net;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataOutputStream;
 import java.io.IOException;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.File;
 import java.io.FileOutputStream;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Tue Feb 23 23:07:01 2010
@@ -1,100 +1,121 @@
-package org.apache.cassandra.streaming;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-class CompletedFileStatus
-{
-    private static ICompactSerializer<CompletedFileStatus> serializer_;
-
-    public static enum StreamCompletionAction
-    {
-        DELETE,
-        STREAM
-    }
-
-    static
-    {
-        serializer_ = new CompletedFileStatusSerializer();
-    }
-
-    public static ICompactSerializer<CompletedFileStatus> serializer()
-    {
-        return serializer_;
-    }
-
-    private String file_;
-    private long expectedBytes_;
-    private StreamCompletionAction action_;
-
-    public CompletedFileStatus(String file, long expectedBytes)
-    {
-        file_ = file;
-        expectedBytes_ = expectedBytes;
-        action_ = StreamCompletionAction.DELETE;
-    }
-
-    public String getFile()
-    {
-        return file_;
-    }
-
-    public long getExpectedBytes()
-    {
-        return expectedBytes_;
-    }
-
-    public void setAction(StreamCompletionAction action)
-    {
-        action_ = action;
-    }
-
-    public StreamCompletionAction getAction()
-    {
-        return action_;
-    }
-
-    public Message makeStreamStatusMessage() throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        CompletedFileStatus.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
-    }
-
-    private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
-    {
-        public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
-        {
-            dos.writeUTF(streamStatus.getFile());
-            dos.writeLong(streamStatus.getExpectedBytes());
-            dos.writeInt(streamStatus.getAction().ordinal());
-        }
-
-        public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();
-            CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
-
-            int ordinal = dis.readInt();
-            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.DELETE);
-            }
-            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.STREAM);
-            }
-
-            return streamStatus;
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+class CompletedFileStatus
+{
+    private static ICompactSerializer<CompletedFileStatus> serializer_;
+
+    public static enum StreamCompletionAction
+    {
+        DELETE,
+        STREAM
+    }
+
+    static
+    {
+        serializer_ = new CompletedFileStatusSerializer();
+    }
+
+    public static ICompactSerializer<CompletedFileStatus> serializer()
+    {
+        return serializer_;
+    }
+
+    private String file_;
+    private long expectedBytes_;
+    private StreamCompletionAction action_;
+
+    public CompletedFileStatus(String file, long expectedBytes)
+    {
+        file_ = file;
+        expectedBytes_ = expectedBytes;
+        action_ = StreamCompletionAction.DELETE;
+    }
+
+    public String getFile()
+    {
+        return file_;
+    }
+
+    public long getExpectedBytes()
+    {
+        return expectedBytes_;
+    }
+
+    public void setAction(StreamCompletionAction action)
+    {
+        action_ = action;
+    }
+
+    public StreamCompletionAction getAction()
+    {
+        return action_;
+    }
+
+    public Message makeStreamStatusMessage() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        CompletedFileStatus.serializer().serialize(this, dos);
+        return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+    }
+
+    private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
+    {
+        public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(streamStatus.getFile());
+            dos.writeLong(streamStatus.getExpectedBytes());
+            dos.writeInt(streamStatus.getAction().ordinal());
+        }
+
+        public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
+
+            int ordinal = dis.readInt();
+            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.DELETE);
+            }
+            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.STREAM);
+            }
+
+            return streamStatus;
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.File;
 import java.io.IOException;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Tue Feb 23 23:07:01 2010
@@ -1,48 +1,69 @@
-package org.apache.cassandra.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamOutManager;
-
-public class StreamFinishedVerbHandler implements IVerbHandler
-{
-    private static Logger logger = Logger.getLogger(StreamFinishedVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-        byte[] body = message.getMessageBody();
-        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-
-        try
-        {
-            CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
-
-            switch (streamStatus.getAction())
-            {
-                case DELETE:
-                    StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
-                    break;
-
-                case STREAM:
-                    if (logger.isDebugEnabled())
-                        logger.debug("Need to re-stream file " + streamStatus.getFile());
-                    StreamOutManager.get(message.getFrom()).startNext();
-                    break;
-
-                default:
-                    break;
-            }
-        }
-        catch (IOException ex)
-        {
-            throw new IOError(ex);
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamOutManager;
+
+public class StreamFinishedVerbHandler implements IVerbHandler
+{
+    private static Logger logger = Logger.getLogger(StreamFinishedVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+        try
+        {
+            CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
+
+            switch (streamStatus.getAction())
+            {
+                case DELETE:
+                    StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
+                    break;
+
+                case STREAM:
+                    if (logger.isDebugEnabled())
+                        logger.debug("Need to re-stream file " + streamStatus.getFile());
+                    StreamOutManager.get(message.getFrom()).startNext();
+                    break;
+
+                default:
+                    break;
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.net.InetAddress;
 import java.util.Collection;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java Tue Feb 23 23:07:01 2010
@@ -1,19 +1,40 @@
-package org.apache.cassandra.streaming;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamOutManager;
-
-public class StreamInitiateDoneVerbHandler implements IVerbHandler
-{
-    private static Logger logger = Logger.getLogger(StreamInitiateDoneVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-        if (logger.isDebugEnabled())
-          logger.debug("Received a stream initiate done message ...");
-        StreamOutManager.get(message.getFrom()).startNext();
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamOutManager;
+
+public class StreamInitiateDoneVerbHandler implements IVerbHandler
+{
+    private static Logger logger = Logger.getLogger(StreamInitiateDoneVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        if (logger.isDebugEnabled())
+          logger.debug("Received a stream initiate done message ...");
+        StreamOutManager.get(message.getFrom()).startNext();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.*;
 import java.net.InetAddress;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Feb 23 23:07:01 2010
@@ -1,76 +1,97 @@
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
-* This class encapsulates the message that needs to be sent to nodes
-* that handoff data. The message contains information about ranges
-* that need to be transferred and the target node.
-*/
-class StreamRequestMessage
-{
-   private static ICompactSerializer<StreamRequestMessage> serializer_;
-   static
-   {
-       serializer_ = new StreamRequestMessageSerializer();
-   }
-
-   protected static ICompactSerializer<StreamRequestMessage> serializer()
-   {
-       return serializer_;
-   }
-
-   protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage)
-   {
-       ByteArrayOutputStream bos = new ByteArrayOutputStream();
-       DataOutputStream dos = new DataOutputStream(bos);
-       try
-       {
-           StreamRequestMessage.serializer().serialize(streamRequestMessage, dos);
-       }
-       catch (IOException e)
-       {
-           throw new IOError(e);
-       }
-       return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
-   }
-
-   protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];
-
-   // TODO only actually ever need one BM, not an array
-   StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
-   {
-       assert streamRequestMetadata != null;
-       streamRequestMetadata_ = streamRequestMetadata;
-   }
-
-    private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
-    {
-        public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException
-        {
-            StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
-            dos.writeInt(streamRequestMetadata.length);
-            for (StreamRequestMetadata bsmd : streamRequestMetadata)
-            {
-                StreamRequestMetadata.serializer().serialize(bsmd, dos);
-            }
-        }
-
-        public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
-        {
-            int size = dis.readInt();
-            StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size];
-            for (int i = 0; i < size; ++i)
-            {
-                streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis);
-            }
-            return new StreamRequestMessage(streamRequestMetadata);
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+* This class encapsulates the message that needs to be sent to nodes
+* that handoff data. The message contains information about ranges
+* that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+   private static ICompactSerializer<StreamRequestMessage> serializer_;
+   static
+   {
+       serializer_ = new StreamRequestMessageSerializer();
+   }
+
+   protected static ICompactSerializer<StreamRequestMessage> serializer()
+   {
+       return serializer_;
+   }
+
+   protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage)
+   {
+       ByteArrayOutputStream bos = new ByteArrayOutputStream();
+       DataOutputStream dos = new DataOutputStream(bos);
+       try
+       {
+           StreamRequestMessage.serializer().serialize(streamRequestMessage, dos);
+       }
+       catch (IOException e)
+       {
+           throw new IOError(e);
+       }
+       return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
+   }
+
+   protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];
+
+   // TODO only actually ever need one BM, not an array
+   StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+   {
+       assert streamRequestMetadata != null;
+       streamRequestMetadata_ = streamRequestMetadata;
+   }
+
+    private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
+    {
+        public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException
+        {
+            StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
+            dos.writeInt(streamRequestMetadata.length);
+            for (StreamRequestMetadata bsmd : streamRequestMetadata)
+            {
+                StreamRequestMetadata.serializer().serialize(bsmd, dos);
+            }
+        }
+
+        public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
+        {
+            int size = dis.readInt();
+            StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size];
+            for (int i = 0; i < size; ++i)
+            {
+                streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis);
+            }
+            return new StreamRequestMessage(streamRequestMetadata);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
 package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;