You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/24 00:07:03 UTC
svn commit: r915575 [2/3] - in /incubator/cassandra/trunk: ./
contrib/word_count/src/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/auth/ src/java/org/apache/cassandra/avro/
src/java/org/apache/cassandra/cache/ sr...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Bounds.java Tue Feb 23 23:07:01 2010
@@ -1,73 +1,94 @@
-package org.apache.cassandra.dht;
-
-import java.util.*;
-
-import org.apache.cassandra.service.StorageService;
-
-public class Bounds extends AbstractBounds
-{
- public Bounds(Token left, Token right)
- {
- this(left, right, StorageService.getPartitioner());
- }
-
- Bounds(Token left, Token right, IPartitioner partitioner)
- {
- super(left, right, partitioner);
- // unlike a Range, a Bounds may not wrap
- assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
- }
-
- @Override
- public boolean contains(Token token)
- {
- return Range.contains(left, right, token) || left.equals(token);
- }
-
- public Set<AbstractBounds> restrictTo(Range range)
- {
- Token min = partitioner.getMinimumToken();
-
- // special case Bounds where left=right (single Token)
- if (this.left.equals(this.right) && !this.right.equals(min))
- return range.contains(this.left)
- ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
- : Collections.<AbstractBounds>emptySet();
-
- // get the intersection of a Range w/ same left & right
- Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
- // if range doesn't contain left token anyway, that's the correct answer
- if (!range.contains(this.left))
- return (Set) ranges;
- // otherwise, add back in the left token
- Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
- for (Range restricted : ranges)
- {
- if (restricted.left.equals(this.left))
- S.add(new Bounds(restricted.left, restricted.right));
- else
- S.add(restricted);
- }
- return Collections.unmodifiableSet(S);
- }
-
- public List<AbstractBounds> unwrap()
- {
- // Bounds objects never wrap
- return (List)Arrays.asList(this);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (!(o instanceof Bounds))
- return false;
- Bounds rhs = (Bounds)o;
- return left.equals(rhs.left) && right.equals(rhs.right);
- }
-
- public String toString()
- {
- return "[" + left + "," + right + "]";
- }
-}
+package org.apache.cassandra.dht;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.*;
+
+import org.apache.cassandra.service.StorageService;
+
+public class Bounds extends AbstractBounds
+{
+ public Bounds(Token left, Token right)
+ {
+ this(left, right, StorageService.getPartitioner());
+ }
+
+ Bounds(Token left, Token right, IPartitioner partitioner)
+ {
+ super(left, right, partitioner);
+ // unlike a Range, a Bounds may not wrap
+ assert left.compareTo(right) <= 0 || right.equals(partitioner.getMinimumToken()) : "[" + left + "," + right + "]";
+ }
+
+ @Override
+ public boolean contains(Token token)
+ {
+ return Range.contains(left, right, token) || left.equals(token);
+ }
+
+ public Set<AbstractBounds> restrictTo(Range range)
+ {
+ Token min = partitioner.getMinimumToken();
+
+ // special case Bounds where left=right (single Token)
+ if (this.left.equals(this.right) && !this.right.equals(min))
+ return range.contains(this.left)
+ ? Collections.unmodifiableSet(new HashSet<AbstractBounds>(Arrays.asList(this)))
+ : Collections.<AbstractBounds>emptySet();
+
+ // get the intersection of a Range w/ same left & right
+ Set<Range> ranges = range.intersectionWith(new Range(this.left, this.right));
+ // if range doesn't contain left token anyway, that's the correct answer
+ if (!range.contains(this.left))
+ return (Set) ranges;
+ // otherwise, add back in the left token
+ Set<AbstractBounds> S = new HashSet<AbstractBounds>(ranges.size());
+ for (Range restricted : ranges)
+ {
+ if (restricted.left.equals(this.left))
+ S.add(new Bounds(restricted.left, restricted.right));
+ else
+ S.add(restricted);
+ }
+ return Collections.unmodifiableSet(S);
+ }
+
+ public List<AbstractBounds> unwrap()
+ {
+ // Bounds objects never wrap
+ return (List)Arrays.asList(this);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof Bounds))
+ return false;
+ Bounds rhs = (Bounds)o;
+ return left.equals(rhs.left) && right.equals(rhs.right);
+ }
+
+ public String toString()
+ {
+ return "[" + left + "," + right + "]";
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.hadoop;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.IOException;
import java.net.InetAddress;
@@ -226,4 +247,4 @@
{
return new ColumnFamilyRecordReader();
}
-}
\ No newline at end of file
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.hadoop;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.IOException;
import java.net.InetAddress;
@@ -188,4 +209,4 @@
{
return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
}
-}
\ No newline at end of file
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.hadoop;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.DataInput;
import java.io.DataOutput;
@@ -132,4 +153,4 @@
w.readFields(in);
return w;
}
-}
\ No newline at end of file
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java Tue Feb 23 23:07:01 2010
@@ -1,67 +1,88 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class DeletionService
-{
- public static final int MAX_RETRIES = 10;
-
- public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
-
- public static void submitDelete(final String file)
- {
- Runnable deleter = new WrappedRunnable()
- {
- @Override
- protected void runMayThrow() throws IOException
- {
- FileUtils.deleteWithConfirm(new File(file));
- }
- };
- executor.submit(deleter);
- }
-
- public static void submitDeleteWithRetry(String file)
- {
- submitDeleteWithRetry(file, 0);
- }
-
- private static void submitDeleteWithRetry(final String file, final int retryCount)
- {
- Runnable deleter = new WrappedRunnable()
- {
- @Override
- protected void runMayThrow() throws IOException
- {
- if (!new File(file).delete())
- {
- if (retryCount > MAX_RETRIES)
- throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- Thread.sleep(10000);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- submitDeleteWithRetry(file, retryCount + 1);
- }
- }, "Delete submission: " + file).start();
- }
- }
- };
- executor.submit(deleter);
- }
-}
+package org.apache.cassandra.io;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class DeletionService
+{
+ public static final int MAX_RETRIES = 10;
+
+ public static final ExecutorService executor = new JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
+
+ public static void submitDelete(final String file)
+ {
+ Runnable deleter = new WrappedRunnable()
+ {
+ @Override
+ protected void runMayThrow() throws IOException
+ {
+ FileUtils.deleteWithConfirm(new File(file));
+ }
+ };
+ executor.submit(deleter);
+ }
+
+ public static void submitDeleteWithRetry(String file)
+ {
+ submitDeleteWithRetry(file, 0);
+ }
+
+ private static void submitDeleteWithRetry(final String file, final int retryCount)
+ {
+ Runnable deleter = new WrappedRunnable()
+ {
+ @Override
+ protected void runMayThrow() throws IOException
+ {
+ if (!new File(file).delete())
+ {
+ if (retryCount > MAX_RETRIES)
+ throw new IOException("Unable to delete " + file + " after " + MAX_RETRIES + " tries");
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ submitDeleteWithRetry(file, retryCount + 1);
+ }
+ }, "Delete submission: " + file).start();
+ }
+ }
+ };
+ executor.submit(deleter);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableDeletingReference.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.io;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.File;
import java.io.IOError;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java Tue Feb 23 23:07:01 2010
@@ -1,18 +1,39 @@
-package org.apache.cassandra.io.util;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.Closeable;
-
-public interface FileDataInput extends DataInput, Closeable
-{
- public String getPath();
-
- public boolean isEOF() throws IOException;
-
- public void mark();
-
- public void reset() throws IOException;
-
- public int bytesPastMark();
-}
+package org.apache.cassandra.io.util;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.Closeable;
+
+public interface FileDataInput extends DataInput, Closeable
+{
+ public String getPath();
+
+ public boolean isEOF() throws IOException;
+
+ public void mark();
+
+ public void reset() throws IOException;
+
+ public int bytesPastMark();
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Tue Feb 23 23:07:01 2010
@@ -1,404 +1,425 @@
-package org.apache.cassandra.io.util;
-
-import java.nio.MappedByteBuffer;
-import java.io.*;
-
-public class MappedFileDataInput extends InputStream implements FileDataInput
-{
- private final MappedByteBuffer buffer;
- private final String filename;
- private int position;
- private int markedPosition;
-
- public MappedFileDataInput(MappedByteBuffer buffer, String filename)
- {
- this(buffer, filename, 0);
- }
-
- public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
- {
- assert buffer != null;
- this.buffer = buffer;
- this.filename = filename;
- this.position = position;
- }
-
- // don't make this public, this is only for seeking WITHIN the current mapped segment
- private void seekInternal(int pos) throws IOException
- {
- position = pos;
- }
-
- @Override
- public boolean markSupported()
- {
- return true;
- }
-
- @Override
- public void mark(int ignored)
- {
- markedPosition = position;
- }
-
- @Override
- public void reset() throws IOException
- {
- seekInternal(markedPosition);
- }
-
- public void mark()
- {
- mark(-1);
- }
-
- public int bytesPastMark()
- {
- assert position >= markedPosition;
- return position - markedPosition;
- }
-
- public boolean isEOF() throws IOException
- {
- return position == buffer.capacity();
- }
-
- public String getPath()
- {
- return filename;
- }
-
- public int read() throws IOException
- {
- if (isEOF())
- return -1;
- return buffer.get(position++) & 0xFF;
- }
-
- public int skipBytes(int n) throws IOException
- {
- if (n <= 0)
- return 0;
- int oldPosition = position;
- assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
- position = Math.min(buffer.capacity(), position + n);
- return position - oldPosition;
- }
-
- /*
- !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
- */
-
- /**
- * Reads a boolean from the current position in this file. Blocks until one
- * byte has been read, the end of the file is reached or an exception is
- * thrown.
- *
- * @return the next boolean value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final boolean readBoolean() throws IOException {
- int temp = this.read();
- if (temp < 0) {
- throw new EOFException();
- }
- return temp != 0;
- }
-
- /**
- * Reads an 8-bit byte from the current position in this file. Blocks until
- * one byte has been read, the end of the file is reached or an exception is
- * thrown.
- *
- * @return the next signed 8-bit byte value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final byte readByte() throws IOException {
- int temp = this.read();
- if (temp < 0) {
- throw new EOFException();
- }
- return (byte) temp;
- }
-
- /**
- * Reads a 16-bit character from the current position in this file. Blocks until
- * two bytes have been read, the end of the file is reached or an exception is
- * thrown.
- *
- * @return the next char value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final char readChar() throws IOException {
- byte[] buffer = new byte[2];
- if (read(buffer, 0, buffer.length) != buffer.length) {
- throw new EOFException();
- }
- return (char) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
- }
-
- /**
- * Reads a 64-bit double from the current position in this file. Blocks
- * until eight bytes have been read, the end of the file is reached or an
- * exception is thrown.
- *
- * @return the next double value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final double readDouble() throws IOException {
- return Double.longBitsToDouble(readLong());
- }
-
- /**
- * Reads a 32-bit float from the current position in this file. Blocks
- * until four bytes have been read, the end of the file is reached or an
- * exception is thrown.
- *
- * @return the next float value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final float readFloat() throws IOException {
- return Float.intBitsToFloat(readInt());
- }
-
- /**
- * Reads bytes from this file into {@code buffer}. Blocks until {@code
- * buffer.length} number of bytes have been read, the end of the file is
- * reached or an exception is thrown.
- *
- * @param buffer
- * the buffer to read bytes into.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- * @throws NullPointerException
- * if {@code buffer} is {@code null}.
- */
- public final void readFully(byte[] buffer) throws IOException {
- readFully(buffer, 0, buffer.length);
- }
-
- /**
- * Read bytes from this file into {@code buffer} starting at offset {@code
- * offset}. This method blocks until {@code count} number of bytes have been
- * read.
- *
- * @param buffer
- * the buffer to read bytes into.
- * @param offset
- * the initial position in {@code buffer} to store the bytes read
- * from this file.
- * @param count
- * the maximum number of bytes to store in {@code buffer}.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IndexOutOfBoundsException
- * if {@code offset < 0} or {@code count < 0}, or if {@code
- * offset + count} is greater than the length of {@code buffer}.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- * @throws NullPointerException
- * if {@code buffer} is {@code null}.
- */
- public final void readFully(byte[] buffer, int offset, int count)
- throws IOException {
- if (buffer == null) {
- throw new NullPointerException();
- }
- // avoid int overflow
- if (offset < 0 || offset > buffer.length || count < 0
- || count > buffer.length - offset) {
- throw new IndexOutOfBoundsException();
- }
- while (count > 0) {
- int result = read(buffer, offset, count);
- if (result < 0) {
- throw new EOFException();
- }
- offset += result;
- count -= result;
- }
- }
-
- /**
- * Reads a 32-bit integer from the current position in this file. Blocks
- * until four bytes have been read, the end of the file is reached or an
- * exception is thrown.
- *
- * @return the next int value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final int readInt() throws IOException {
- byte[] buffer = new byte[4];
- if (read(buffer, 0, buffer.length) != buffer.length) {
- throw new EOFException();
- }
- return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
- + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
- }
-
- /**
- * Reads a line of text form the current position in this file. A line is
- * represented by zero or more characters followed by {@code '\n'}, {@code
- * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
- * include the line terminating sequence.
- * <p>
- * Blocks until a line terminating sequence has been read, the end of the
- * file is reached or an exception is thrown.
- *
- * @return the contents of the line or {@code null} if no characters have
- * been read before the end of the file has been reached.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final String readLine() throws IOException {
- StringBuilder line = new StringBuilder(80); // Typical line length
- boolean foundTerminator = false;
- int unreadPosition = 0;
- while (true) {
- int nextByte = read();
- switch (nextByte) {
- case -1:
- return line.length() != 0 ? line.toString() : null;
- case (byte) '\r':
- if (foundTerminator) {
- seekInternal(unreadPosition);
- return line.toString();
- }
- foundTerminator = true;
- /* Have to be able to peek ahead one byte */
- unreadPosition = position;
- break;
- case (byte) '\n':
- return line.toString();
- default:
- if (foundTerminator) {
- seekInternal(unreadPosition);
- return line.toString();
- }
- line.append((char) nextByte);
- }
- }
- }
-
- /**
- * Reads a 64-bit long from the current position in this file. Blocks until
- * eight bytes have been read, the end of the file is reached or an
- * exception is thrown.
- *
- * @return the next long value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final long readLong() throws IOException {
- byte[] buffer = new byte[8];
- int n = read(buffer, 0, buffer.length);
- if (n != buffer.length) {
- throw new EOFException("expected 8 bytes; read " + n + " at final position " + position);
- }
- return ((long) (((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
- + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff)) << 32)
- + ((long) (buffer[4] & 0xff) << 24)
- + ((buffer[5] & 0xff) << 16)
- + ((buffer[6] & 0xff) << 8)
- + (buffer[7] & 0xff);
- }
-
- /**
- * Reads a 16-bit short from the current position in this file. Blocks until
- * two bytes have been read, the end of the file is reached or an exception
- * is thrown.
- *
- * @return the next short value from this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final short readShort() throws IOException {
- byte[] buffer = new byte[2];
- if (read(buffer, 0, buffer.length) != buffer.length) {
- throw new EOFException();
- }
- return (short) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
- }
-
- /**
- * Reads an unsigned 8-bit byte from the current position in this file and
- * returns it as an integer. Blocks until one byte has been read, the end of
- * the file is reached or an exception is thrown.
- *
- * @return the next unsigned byte value from this file as an int.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final int readUnsignedByte() throws IOException {
- int temp = this.read();
- if (temp < 0) {
- throw new EOFException();
- }
- return temp;
- }
-
- /**
- * Reads an unsigned 16-bit short from the current position in this file and
- * returns it as an integer. Blocks until two bytes have been read, the end of
- * the file is reached or an exception is thrown.
- *
- * @return the next unsigned short value from this file as an int.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- */
- public final int readUnsignedShort() throws IOException {
- byte[] buffer = new byte[2];
- if (read(buffer, 0, buffer.length) != buffer.length) {
- throw new EOFException();
- }
- return ((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff);
- }
-
- /**
- * Reads a string that is encoded in {@link DataInput modified UTF-8} from
- * this file. The number of bytes that must be read for the complete string
- * is determined by the first two bytes read from the file. Blocks until all
- * required bytes have been read, the end of the file is reached or an
- * exception is thrown.
- *
- * @return the next string encoded in {@link DataInput modified UTF-8} from
- * this file.
- * @throws EOFException
- * if the end of this file is detected.
- * @throws IOException
- * if this file is closed or another I/O error occurs.
- * @throws UTFDataFormatException
- * if the bytes read cannot be decoded into a character string.
- */
- public final String readUTF() throws IOException {
- return DataInputStream.readUTF(this);
- }
-}
+package org.apache.cassandra.io.util;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.nio.MappedByteBuffer;
+import java.io.*;
+
+public class MappedFileDataInput extends InputStream implements FileDataInput
+{
+ private final MappedByteBuffer buffer;
+ private final String filename;
+ private int position;
+ private int markedPosition;
+
+ public MappedFileDataInput(MappedByteBuffer buffer, String filename)
+ {
+ this(buffer, filename, 0);
+ }
+
+ public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
+ {
+ assert buffer != null;
+ this.buffer = buffer;
+ this.filename = filename;
+ this.position = position;
+ }
+
+ // don't make this public, this is only for seeking WITHIN the current mapped segment
+ private void seekInternal(int pos) throws IOException
+ {
+ position = pos;
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public void mark(int ignored)
+ {
+ markedPosition = position;
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ seekInternal(markedPosition);
+ }
+
+ public void mark()
+ {
+ mark(-1);
+ }
+
+ public int bytesPastMark()
+ {
+ assert position >= markedPosition;
+ return position - markedPosition;
+ }
+
+ public boolean isEOF() throws IOException
+ {
+ return position == buffer.capacity();
+ }
+
+ public String getPath()
+ {
+ return filename;
+ }
+
+ public int read() throws IOException
+ {
+ if (isEOF())
+ return -1;
+ return buffer.get(position++) & 0xFF;
+ }
+
+ public int skipBytes(int n) throws IOException
+ {
+ if (n <= 0)
+ return 0;
+ int oldPosition = position;
+ assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
+ position = Math.min(buffer.capacity(), position + n);
+ return position - oldPosition;
+ }
+
+ /*
+ !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+ */
+
+ /**
+ * Reads a boolean from the current position in this file. Blocks until one
+ * byte has been read, the end of the file is reached or an exception is
+ * thrown.
+ *
+ * @return the next boolean value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final boolean readBoolean() throws IOException {
+ int temp = this.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return temp != 0;
+ }
+
+ /**
+ * Reads an 8-bit byte from the current position in this file. Blocks until
+ * one byte has been read, the end of the file is reached or an exception is
+ * thrown.
+ *
+ * @return the next signed 8-bit byte value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final byte readByte() throws IOException {
+ int temp = this.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return (byte) temp;
+ }
+
+ /**
+ * Reads a 16-bit character from the current position in this file. Blocks until
+ * two bytes have been read, the end of the file is reached or an exception is
+ * thrown.
+ *
+ * @return the next char value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final char readChar() throws IOException {
+ byte[] buffer = new byte[2];
+ if (read(buffer, 0, buffer.length) != buffer.length) {
+ throw new EOFException();
+ }
+ return (char) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
+ }
+
+ /**
+ * Reads a 64-bit double from the current position in this file. Blocks
+ * until eight bytes have been read, the end of the file is reached or an
+ * exception is thrown.
+ *
+ * @return the next double value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /**
+ * Reads a 32-bit float from the current position in this file. Blocks
+ * until four bytes have been read, the end of the file is reached or an
+ * exception is thrown.
+ *
+ * @return the next float value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /**
+ * Reads bytes from this file into {@code buffer}. Blocks until {@code
+ * buffer.length} number of bytes have been read, the end of the file is
+ * reached or an exception is thrown.
+ *
+ * @param buffer
+ * the buffer to read bytes into.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ * @throws NullPointerException
+ * if {@code buffer} is {@code null}.
+ */
+ public final void readFully(byte[] buffer) throws IOException {
+ readFully(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Read bytes from this file into {@code buffer} starting at offset {@code
+ * offset}. This method blocks until {@code count} number of bytes have been
+ * read.
+ *
+ * @param buffer
+ * the buffer to read bytes into.
+ * @param offset
+ * the initial position in {@code buffer} to store the bytes read
+ * from this file.
+ * @param count
+ * the maximum number of bytes to store in {@code buffer}.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IndexOutOfBoundsException
+ * if {@code offset < 0} or {@code count < 0}, or if {@code
+ * offset + count} is greater than the length of {@code buffer}.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ * @throws NullPointerException
+ * if {@code buffer} is {@code null}.
+ */
+ public final void readFully(byte[] buffer, int offset, int count)
+ throws IOException {
+ if (buffer == null) {
+ throw new NullPointerException();
+ }
+ // avoid int overflow
+ if (offset < 0 || offset > buffer.length || count < 0
+ || count > buffer.length - offset) {
+ throw new IndexOutOfBoundsException();
+ }
+ while (count > 0) {
+ int result = read(buffer, offset, count);
+ if (result < 0) {
+ throw new EOFException();
+ }
+ offset += result;
+ count -= result;
+ }
+ }
+
+ /**
+ * Reads a 32-bit integer from the current position in this file. Blocks
+ * until four bytes have been read, the end of the file is reached or an
+ * exception is thrown.
+ *
+ * @return the next int value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final int readInt() throws IOException {
+ byte[] buffer = new byte[4];
+ if (read(buffer, 0, buffer.length) != buffer.length) {
+ throw new EOFException();
+ }
+ return ((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
+ + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff);
+ }
+
+ /**
+ * Reads a line of text form the current position in this file. A line is
+ * represented by zero or more characters followed by {@code '\n'}, {@code
+ * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
+ * include the line terminating sequence.
+ * <p>
+ * Blocks until a line terminating sequence has been read, the end of the
+ * file is reached or an exception is thrown.
+ *
+ * @return the contents of the line or {@code null} if no characters have
+ * been read before the end of the file has been reached.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final String readLine() throws IOException {
+ StringBuilder line = new StringBuilder(80); // Typical line length
+ boolean foundTerminator = false;
+ int unreadPosition = 0;
+ while (true) {
+ int nextByte = read();
+ switch (nextByte) {
+ case -1:
+ return line.length() != 0 ? line.toString() : null;
+ case (byte) '\r':
+ if (foundTerminator) {
+ seekInternal(unreadPosition);
+ return line.toString();
+ }
+ foundTerminator = true;
+ /* Have to be able to peek ahead one byte */
+ unreadPosition = position;
+ break;
+ case (byte) '\n':
+ return line.toString();
+ default:
+ if (foundTerminator) {
+ seekInternal(unreadPosition);
+ return line.toString();
+ }
+ line.append((char) nextByte);
+ }
+ }
+ }
+
+ /**
+ * Reads a 64-bit long from the current position in this file. Blocks until
+ * eight bytes have been read, the end of the file is reached or an
+ * exception is thrown.
+ *
+ * @return the next long value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final long readLong() throws IOException {
+ byte[] buffer = new byte[8];
+ int n = read(buffer, 0, buffer.length);
+ if (n != buffer.length) {
+ throw new EOFException("expected 8 bytes; read " + n + " at final position " + position);
+ }
+ return ((long) (((buffer[0] & 0xff) << 24) + ((buffer[1] & 0xff) << 16)
+ + ((buffer[2] & 0xff) << 8) + (buffer[3] & 0xff)) << 32)
+ + ((long) (buffer[4] & 0xff) << 24)
+ + ((buffer[5] & 0xff) << 16)
+ + ((buffer[6] & 0xff) << 8)
+ + (buffer[7] & 0xff);
+ }
+
+ /**
+ * Reads a 16-bit short from the current position in this file. Blocks until
+ * two bytes have been read, the end of the file is reached or an exception
+ * is thrown.
+ *
+ * @return the next short value from this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final short readShort() throws IOException {
+ byte[] buffer = new byte[2];
+ if (read(buffer, 0, buffer.length) != buffer.length) {
+ throw new EOFException();
+ }
+ return (short) (((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff));
+ }
+
+ /**
+ * Reads an unsigned 8-bit byte from the current position in this file and
+ * returns it as an integer. Blocks until one byte has been read, the end of
+ * the file is reached or an exception is thrown.
+ *
+ * @return the next unsigned byte value from this file as an int.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final int readUnsignedByte() throws IOException {
+ int temp = this.read();
+ if (temp < 0) {
+ throw new EOFException();
+ }
+ return temp;
+ }
+
+ /**
+ * Reads an unsigned 16-bit short from the current position in this file and
+ * returns it as an integer. Blocks until two bytes have been read, the end of
+ * the file is reached or an exception is thrown.
+ *
+ * @return the next unsigned short value from this file as an int.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ */
+ public final int readUnsignedShort() throws IOException {
+ byte[] buffer = new byte[2];
+ if (read(buffer, 0, buffer.length) != buffer.length) {
+ throw new EOFException();
+ }
+ return ((buffer[0] & 0xff) << 8) + (buffer[1] & 0xff);
+ }
+
+ /**
+ * Reads a string that is encoded in {@link DataInput modified UTF-8} from
+ * this file. The number of bytes that must be read for the complete string
+ * is determined by the first two bytes read from the file. Blocks until all
+ * required bytes have been read, the end of the file is reached or an
+ * exception is thrown.
+ *
+ * @return the next string encoded in {@link DataInput modified UTF-8} from
+ * this file.
+ * @throws EOFException
+ * if the end of this file is detected.
+ * @throws IOException
+ * if this file is closed or another I/O error occurs.
+ * @throws UTFDataFormatException
+ * if the bytes read cannot be decoded into a character string.
+ */
+ public final String readUTF() throws IOException {
+ return DataInputStream.readUTF(this);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Feb 23 23:07:01 2010
@@ -1,69 +1,90 @@
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.Socket;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.streaming.IncomingStreamReader;
-
-public class IncomingTcpConnection extends Thread
-{
- private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
-
- private final DataInputStream input;
- private Socket socket;
-
- public IncomingTcpConnection(Socket socket)
- {
- this.socket = socket;
- try
- {
- input = new DataInputStream(socket.getInputStream());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- @Override
- public void run()
- {
- while (true)
- {
- try
- {
- MessagingService.validateMagic(input.readInt());
- int header = input.readInt();
- int type = MessagingService.getBits(header, 1, 2);
- boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
- int version = MessagingService.getBits(header, 15, 8);
-
- if (isStream)
- {
- new IncomingStreamReader(socket.getChannel()).read();
- }
- else
- {
- int size = input.readInt();
- byte[] contentBytes = new byte[size];
- input.readFully(contentBytes);
- MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
- }
- }
- catch (EOFException e)
- {
- if (logger.isTraceEnabled())
- logger.trace("eof reading from socket; closing", e);
- break;
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled())
- logger.debug("error reading from socket; closing", e);
- break;
- }
- }
- }
-}
+package org.apache.cassandra.net;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.*;
+import java.net.Socket;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.streaming.IncomingStreamReader;
+
+public class IncomingTcpConnection extends Thread
+{
+ private static Logger logger = Logger.getLogger(IncomingTcpConnection.class);
+
+ private final DataInputStream input;
+ private Socket socket;
+
+ public IncomingTcpConnection(Socket socket)
+ {
+ this.socket = socket;
+ try
+ {
+ input = new DataInputStream(socket.getInputStream());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ MessagingService.validateMagic(input.readInt());
+ int header = input.readInt();
+ int type = MessagingService.getBits(header, 1, 2);
+ boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+ int version = MessagingService.getBits(header, 15, 8);
+
+ if (isStream)
+ {
+ new IncomingStreamReader(socket.getChannel()).read();
+ }
+ else
+ {
+ int size = input.readInt();
+ byte[] contentBytes = new byte[size];
+ input.readFully(contentBytes);
+ MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+ }
+ }
+ catch (EOFException e)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("eof reading from socket; closing", e);
+ break;
+ }
+ catch (IOException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("error reading from socket; closing", e);
+ break;
+ }
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.net;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.DataOutputStream;
import java.io.IOException;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.File;
import java.io.FileOutputStream;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Tue Feb 23 23:07:01 2010
@@ -1,100 +1,121 @@
-package org.apache.cassandra.streaming;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-class CompletedFileStatus
-{
- private static ICompactSerializer<CompletedFileStatus> serializer_;
-
- public static enum StreamCompletionAction
- {
- DELETE,
- STREAM
- }
-
- static
- {
- serializer_ = new CompletedFileStatusSerializer();
- }
-
- public static ICompactSerializer<CompletedFileStatus> serializer()
- {
- return serializer_;
- }
-
- private String file_;
- private long expectedBytes_;
- private StreamCompletionAction action_;
-
- public CompletedFileStatus(String file, long expectedBytes)
- {
- file_ = file;
- expectedBytes_ = expectedBytes;
- action_ = StreamCompletionAction.DELETE;
- }
-
- public String getFile()
- {
- return file_;
- }
-
- public long getExpectedBytes()
- {
- return expectedBytes_;
- }
-
- public void setAction(StreamCompletionAction action)
- {
- action_ = action;
- }
-
- public StreamCompletionAction getAction()
- {
- return action_;
- }
-
- public Message makeStreamStatusMessage() throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- CompletedFileStatus.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
- }
-
- private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
- {
- public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(streamStatus.getFile());
- dos.writeLong(streamStatus.getExpectedBytes());
- dos.writeInt(streamStatus.getAction().ordinal());
- }
-
- public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
- {
- String targetFile = dis.readUTF();
- long expectedBytes = dis.readLong();
- CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
-
- int ordinal = dis.readInt();
- if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
- {
- streamStatus.setAction(StreamCompletionAction.DELETE);
- }
- else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
- {
- streamStatus.setAction(StreamCompletionAction.STREAM);
- }
-
- return streamStatus;
- }
- }
-}
+package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+class CompletedFileStatus
+{
+ private static ICompactSerializer<CompletedFileStatus> serializer_;
+
+ public static enum StreamCompletionAction
+ {
+ DELETE,
+ STREAM
+ }
+
+ static
+ {
+ serializer_ = new CompletedFileStatusSerializer();
+ }
+
+ public static ICompactSerializer<CompletedFileStatus> serializer()
+ {
+ return serializer_;
+ }
+
+ private String file_;
+ private long expectedBytes_;
+ private StreamCompletionAction action_;
+
+ public CompletedFileStatus(String file, long expectedBytes)
+ {
+ file_ = file;
+ expectedBytes_ = expectedBytes;
+ action_ = StreamCompletionAction.DELETE;
+ }
+
+ public String getFile()
+ {
+ return file_;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ public void setAction(StreamCompletionAction action)
+ {
+ action_ = action;
+ }
+
+ public StreamCompletionAction getAction()
+ {
+ return action_;
+ }
+
+ public Message makeStreamStatusMessage() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ CompletedFileStatus.serializer().serialize(this, dos);
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+ }
+
+ private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
+ {
+ public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(streamStatus.getFile());
+ dos.writeLong(streamStatus.getExpectedBytes());
+ dos.writeInt(streamStatus.getAction().ordinal());
+ }
+
+ public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
+
+ int ordinal = dis.readInt();
+ if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.DELETE);
+ }
+ else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.STREAM);
+ }
+
+ return streamStatus;
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.File;
import java.io.IOException;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Tue Feb 23 23:07:01 2010
@@ -1,48 +1,69 @@
-package org.apache.cassandra.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamOutManager;
-
-public class StreamFinishedVerbHandler implements IVerbHandler
-{
- private static Logger logger = Logger.getLogger(StreamFinishedVerbHandler.class);
-
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-
- try
- {
- CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
-
- switch (streamStatus.getAction())
- {
- case DELETE:
- StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
- break;
-
- case STREAM:
- if (logger.isDebugEnabled())
- logger.debug("Need to re-stream file " + streamStatus.getFile());
- StreamOutManager.get(message.getFrom()).startNext();
- break;
-
- default:
- break;
- }
- }
- catch (IOException ex)
- {
- throw new IOError(ex);
- }
- }
-}
+package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamOutManager;
+
+public class StreamFinishedVerbHandler implements IVerbHandler
+{
+ private static Logger logger = Logger.getLogger(StreamFinishedVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+ try
+ {
+ CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
+
+ switch (streamStatus.getAction())
+ {
+ case DELETE:
+ StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
+ break;
+
+ case STREAM:
+ if (logger.isDebugEnabled())
+ logger.debug("Need to re-stream file " + streamStatus.getFile());
+ StreamOutManager.get(message.getFrom()).startNext();
+ break;
+
+ default:
+ break;
+ }
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.net.InetAddress;
import java.util.Collection;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java Tue Feb 23 23:07:01 2010
@@ -1,19 +1,40 @@
-package org.apache.cassandra.streaming;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamOutManager;
-
-public class StreamInitiateDoneVerbHandler implements IVerbHandler
-{
- private static Logger logger = Logger.getLogger(StreamInitiateDoneVerbHandler.class);
-
- public void doVerb(Message message)
- {
- if (logger.isDebugEnabled())
- logger.debug("Received a stream initiate done message ...");
- StreamOutManager.get(message.getFrom()).startNext();
- }
-}
+package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.streaming.StreamOutManager;
+
+public class StreamInitiateDoneVerbHandler implements IVerbHandler
+{
+ private static Logger logger = Logger.getLogger(StreamInitiateDoneVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Received a stream initiate done message ...");
+ StreamOutManager.get(message.getFrom()).startNext();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.*;
import java.net.InetAddress;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Feb 23 23:07:01 2010
@@ -1,76 +1,97 @@
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
-* This class encapsulates the message that needs to be sent to nodes
-* that handoff data. The message contains information about ranges
-* that need to be transferred and the target node.
-*/
-class StreamRequestMessage
-{
- private static ICompactSerializer<StreamRequestMessage> serializer_;
- static
- {
- serializer_ = new StreamRequestMessageSerializer();
- }
-
- protected static ICompactSerializer<StreamRequestMessage> serializer()
- {
- return serializer_;
- }
-
- protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage)
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- try
- {
- StreamRequestMessage.serializer().serialize(streamRequestMessage, dos);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
- }
-
- protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];
-
- // TODO only actually ever need one BM, not an array
- StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
- {
- assert streamRequestMetadata != null;
- streamRequestMetadata_ = streamRequestMetadata;
- }
-
- private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
- {
- public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException
- {
- StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
- dos.writeInt(streamRequestMetadata.length);
- for (StreamRequestMetadata bsmd : streamRequestMetadata)
- {
- StreamRequestMetadata.serializer().serialize(bsmd, dos);
- }
- }
-
- public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size];
- for (int i = 0; i < size; ++i)
- {
- streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis);
- }
- return new StreamRequestMessage(streamRequestMetadata);
- }
- }
-}
+package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+* This class encapsulates the message that needs to be sent to nodes
+* that handoff data. The message contains information about ranges
+* that need to be transferred and the target node.
+*/
+class StreamRequestMessage
+{
+ private static ICompactSerializer<StreamRequestMessage> serializer_;
+ static
+ {
+ serializer_ = new StreamRequestMessageSerializer();
+ }
+
+ protected static ICompactSerializer<StreamRequestMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ protected static Message makeStreamRequestMessage(StreamRequestMessage streamRequestMessage)
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try
+ {
+ StreamRequestMessage.serializer().serialize(streamRequestMessage, dos);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ return new Message(FBUtilities.getLocalAddress(), StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
+ }
+
+ protected StreamRequestMetadata[] streamRequestMetadata_ = new StreamRequestMetadata[0];
+
+ // TODO only actually ever need one BM, not an array
+ StreamRequestMessage(StreamRequestMetadata... streamRequestMetadata)
+ {
+ assert streamRequestMetadata != null;
+ streamRequestMetadata_ = streamRequestMetadata;
+ }
+
+ private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
+ {
+ public void serialize(StreamRequestMessage streamRequestMessage, DataOutputStream dos) throws IOException
+ {
+ StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
+ dos.writeInt(streamRequestMetadata.length);
+ for (StreamRequestMetadata bsmd : streamRequestMetadata)
+ {
+ StreamRequestMetadata.serializer().serialize(bsmd, dos);
+ }
+ }
+
+ public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ StreamRequestMetadata[] streamRequestMetadata = new StreamRequestMetadata[size];
+ for (int i = 0; i < size; ++i)
+ {
+ streamRequestMetadata[i] = StreamRequestMetadata.serializer().deserialize(dis);
+ }
+ return new StreamRequestMessage(streamRequestMetadata);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java?rev=915575&r1=915574&r2=915575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMetadata.java Tue Feb 23 23:07:01 2010
@@ -1,4 +1,25 @@
package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import java.io.DataInputStream;
import java.io.DataOutputStream;