You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Aby Kuruvilla <ab...@envisagesystems.com> on 2015/03/04 16:11:32 UTC
Streaming failures during bulkloading data using CqlBulkOutputFormat
I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load
data into Cassandra. Was not able to find any documentation of this new
output format , but from looking through the code this uses
CQLSSTableWriter to write SSTable files to disk , which are then streamed
to Cassandra using SSTableLoader. On running the Hadoop job, I can see that
the SSTable files do get generated but fails to stream the data out. I get
the same exception when I try with Cassndra node on "localhost" as well as
a remote Cassandra cluster. Also I get this exception on C* versions
2.1.1, 2.1.2 and 2.1.3.
*Relevant portion of logs* *and stack trace*
09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - JNA
link failure, one or more native method will be unavailable.
09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA
link failure details: Error looking up function 'posix_fadvise':
dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
metadata for
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
(617874 bytes)
09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk
Load
09:20:23.781 [StreamConnectionEstablisher:1] INFO
o.a.c.streaming.StreamSession - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to /192.168.56.11
09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming
stream
09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing
stream
09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1
files}
09:20:23.795 [StreamConnectionEstablisher:1] INFO
o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving 0
files(0 bytes), sending 1 files(617874 bytes)
09:20:23.799 [StreamConnectionEstablisher:1] INFO
o.a.c.streaming.StreamCoordinator - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session with /
192.168.56.11
09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:
3072, transfer size: 617874, compressed?: true, repairedAt: 0), file:
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
(d6d35793-729c-3cab-bee0-84e971e48675, #0)
09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler on
/192.168.56.11
09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
complete
09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
o.a.c.streaming.StreamSession - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
~[na:1.7.0_51]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
~[na:1.7.0_51]
at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
~[na:1.7.0_51]
at
sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
~[na:1.7.0_51]
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
~[na:1.7.0_51]
at
org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
~[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
~[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
~[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
~[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
[cassandra-all-2.1.2.jar:2.1.2]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
o.a.c.streaming.StreamSession - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
~[na:1.7.0_51]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
~[na:1.7.0_51]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
~[na:1.7.0_51]
at
org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
~[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
~[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
[cassandra-all-2.1.2.jar:2.1.2]
at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
[cassandra-all-2.1.2.jar:2.1.2]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
----------------------------------------------------
Here is what I have tried
*Hadoop Driver*
public class CassandraBulkImporter extends Configured implements Tool{
.....
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new
CassandraBulkImporter(), args);
System.exit(exitCode);
}
@Override
public int run(String[] arg0) throws Exception {
.......
Job job = new Job(conf);
......
job.setOutputFormatClass(CqlBulkOutputFormat.class);
ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
"192.168.56.11");
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputKeyspace(job.getConfiguration(),
CASSANDRA_KEYSPACE_NAME);
ConfigHelper.setOutputColumnFamily(
job.getConfiguration(),
CASSANDRA_KEYSPACE_NAME,
CASSANDRA_TABLE_NAME
);
//Set the properties for CqlBulkOutputFormat
MultipleOutputs.addNamedOutput(job,
CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, List.class);
CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
(?,?,?,?,?) ");
.....
}
}
*Reducer Code*
public class ReducerToCassandra extends Reducer<Text, Text, Object,
List<ByteBuffer>> {
private MultipleOutputs multipleOutputs;
@SuppressWarnings("unchecked")
protected void setup(Context context) throws IOException,
InterruptedException {
multipleOutputs = new MultipleOutputs(context);
}
@Override
public void reduce(Text id, Iterable<Text> pInfo, Context context)
throws IOException, InterruptedException {
....
List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
.....
multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
}
Re: Streaming failures during bulkloading data using CqlBulkOutputFormat
Posted by Yuki Morishita <mo...@gmail.com>.
Thanks!
On Thu, Mar 5, 2015 at 11:10 AM, Aby Kuruvilla
<ab...@envisagesystems.com> wrote:
> Thanks Yuki, have created a JIRA ticket
>
> https://issues.apache.org/jira/browse/CASSANDRA-8924
>
> On Thu, Mar 5, 2015 at 10:34 AM, Yuki Morishita <mo...@gmail.com> wrote:
>>
>> Thanks.
>> It looks like a bug. Can you create a ticket on JIRA?
>>
>> https://issues.apache.org/jira/browse/CASSANDRA
>>
>> On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla
>> <ab...@envisagesystems.com> wrote:
>> > Hi Yuki
>> >
>> > Thanks for the reply!
>> >
>> > Here is the log from Cassandra server for the stream failure
>> >
>> > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
>> > StreamResultFuture.java:109 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508
>> > ID#0] Creating new streaming plan for Bulk Load
>> > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
>> > StreamResultFuture.java:116 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508,
>> > ID#0] Received streaming plan for Bulk Load
>> > INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819
>> > StreamResultFuture.java:116 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508,
>> > ID#0] Received streaming plan for Bulk Load
>> > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822
>> > StreamResultFuture.java:166 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508
>> > ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0
>> > files(0
>> > bytes)
>> > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823
>> > StreamSession.java:597
>> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following
>> > error
>> > java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped
>> > during streaming
>> > at
>> >
>> > org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71)
>> > ~[main/:na]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48)
>> > [main/:na]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38)
>> > [main/:na]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55)
>> > [main/:na]
>> > at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
>> > [main/:na]
>> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> > ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828
>> > StreamSession.java:477
>> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> > occurred
>> > java.lang.IllegalArgumentException: Unknown type 0
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89)
>> > ~[main/:na]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
>> > ~[main/:na]
>> > at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
>> > ~[main/:na]
>> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
>> > StreamResultFuture.java:180 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508]
>> > Session with /127.0.0.1 is complete
>> > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
>> > StreamResultFuture.java:207 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508]
>> > Stream failed
>> >
>> >
>> >
>> > On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mo...@gmail.com>
>> > wrote:
>> >>
>> >> Do you have corresponding error in the other side of the stream
>> >> (/192.168.56.11)?
>> >>
>> >>
>> >> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
>> >> <ab...@envisagesystems.com> wrote:
>> >> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk
>> >> > load
>> >> > data
>> >> > into Cassandra. Was not able to find any documentation of this new
>> >> > output
>> >> > format , but from looking through the code this uses CQLSSTableWriter
>> >> > to
>> >> > write SSTable files to disk , which are then streamed to Cassandra
>> >> > using
>> >> > SSTableLoader. On running the Hadoop job, I can see that the SSTable
>> >> > files
>> >> > do get generated but fails to stream the data out. I get the same
>> >> > exception
>> >> > when I try with Cassndra node on "localhost" as well as a remote
>> >> > Cassandra
>> >> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and
>> >> > 2.1.3.
>> >> >
>> >> > Relevant portion of logs and stack trace
>> >> >
>> >> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary -
>> >> > JNA
>> >> > link
>> >> > failure, one or more native method will be unavailable.
>> >> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary -
>> >> > JNA
>> >> > link
>> >> > failure details: Error looking up function 'posix_fadvise':
>> >> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
>> >> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
>> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
>> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
>> >> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
>> >> > metadata
>> >> > for
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
>> >> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader -
>> >> > Opening
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
>> >> > (617874 bytes)
>> >> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture -
>> >> > [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for
>> >> > Bulk
>> >> > Load
>> >> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO
>> >> > o.a.c.streaming.StreamSession - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to
>> >> > /192.168.56.11
>> >> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for
>> >> > incoming
>> >> > stream
>> >> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for
>> >> > outgoing
>> >> > stream
>> >> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests,
>> >> > 1
>> >> > files}
>> >> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO
>> >> > o.a.c.streaming.StreamResultFuture - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed.
>> >> > Receiving
>> >> > 0
>> >> > files(0 bytes), sending 1 files(617874 bytes)
>> >> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO
>> >> > o.a.c.streaming.StreamCoordinator - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session
>> >> > with
>> >> > /192.168.56.11
>> >> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
>> >> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated
>> >> > keys:
>> >> > 3072,
>> >> > transfer size: 617874, compressed?: true, repairedAt: 0), file:
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
>> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
>> >> > (d6d35793-729c-3cab-bee0-84e971e48675, #0)
>> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
>> >> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection
>> >> > handler
>> >> > on
>> >> > /192.168.56.11
>> >> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
>> >> > o.a.c.streaming.StreamResultFuture - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
>> >> > complete
>> >> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
>> >> > o.a.c.streaming.StreamResultFuture - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
>> >> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
>> >> > o.a.c.streaming.StreamSession
>> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> >> > occurred
>> >> > java.io.IOException: Broken pipe
>> >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>> >> > ~[na:1.7.0_51]
>> >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>> >> > ~[na:1.7.0_51]
>> >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>> >> > ~[na:1.7.0_51]
>> >> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
>> >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>> >> > ~[na:1.7.0_51]
>> >> > at
>> >> >
>> >> >
>> >> > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
>> >> > ~[na:1.7.0_51]
>> >> > at
>> >> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
>> >> > ~[na:1.7.0_51]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> >> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
>> >> > o.a.c.streaming.StreamSession
>> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> >> > occurred
>> >> > java.io.IOException: Broken pipe
>> >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>> >> > ~[na:1.7.0_51]
>> >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>> >> > ~[na:1.7.0_51]
>> >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>> >> > ~[na:1.7.0_51]
>> >> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
>> >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>> >> > ~[na:1.7.0_51]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> >> >
>> >> > ----------------------------------------------------
>> >> >
>> >> > Here is what I have tried
>> >> >
>> >> >
>> >> > Hadoop Driver
>> >> >
>> >> > public class CassandraBulkImporter extends Configured implements
>> >> > Tool{
>> >> >
>> >> > .....
>> >> >
>> >> > public static void main(String[] args) throws Exception
>> >> > {
>> >> > int exitCode = ToolRunner.run(new
>> >> > CassandraBulkImporter(), args);
>> >> > System.exit(exitCode);
>> >> > }
>> >> >
>> >> > @Override
>> >> > public int run(String[] arg0) throws Exception {
>> >> > .......
>> >> > Job job = new Job(conf);
>> >> > ......
>> >> >
>> >> > job.setOutputFormatClass(CqlBulkOutputFormat.class);
>> >> >
>> >> >
>> >> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> >> > "192.168.56.11");
>> >> >
>> >> > ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >> > "Murmur3Partitioner");
>> >> >
>> >> > ConfigHelper.setOutputRpcPort(job.getConfiguration(),
>> >> > "9160");
>> >> >
>> >> > ConfigHelper.setOutputKeyspace(job.getConfiguration(),
>> >> > CASSANDRA_KEYSPACE_NAME);
>> >> > ConfigHelper.setOutputColumnFamily(
>> >> > job.getConfiguration(),
>> >> > CASSANDRA_KEYSPACE_NAME,
>> >> > CASSANDRA_TABLE_NAME
>> >> > );
>> >> > //Set the properties for CqlBulkOutputFormat
>> >> > MultipleOutputs.addNamedOutput(job,
>> >> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class,
>> >> > List.class);
>> >> >
>> >> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
>> >> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
>> >> >
>> >> >
>> >> >
>> >> > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
>> >> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
>> >> > (?,?,?,?,?) ");
>> >> >
>> >> > .....
>> >> > }
>> >> >
>> >> > }
>> >> >
>> >> > Reducer Code
>> >> >
>> >> > public class ReducerToCassandra extends Reducer<Text, Text, Object,
>> >> > List<ByteBuffer>> {
>> >> >
>> >> > private MultipleOutputs multipleOutputs;
>> >> >
>> >> > @SuppressWarnings("unchecked")
>> >> > protected void setup(Context context) throws IOException,
>> >> > InterruptedException {
>> >> > multipleOutputs = new MultipleOutputs(context);
>> >> > }
>> >> >
>> >> > @Override
>> >> > public void reduce(Text id, Iterable<Text> pInfo, Context context)
>> >> > throws
>> >> > IOException, InterruptedException {
>> >> > ....
>> >> > List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
>> >> >
>> >> > .....
>> >> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
>> >> >
>> >> > }
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Yuki Morishita
>> >> t:yukim (http://twitter.com/yukim)
>> >
>> >
>>
>>
>>
>> --
>> Yuki Morishita
>> t:yukim (http://twitter.com/yukim)
>
>
--
Yuki Morishita
t:yukim (http://twitter.com/yukim)
Re: Streaming failures during bulkloading data using CqlBulkOutputFormat
Posted by Aby Kuruvilla <ab...@envisagesystems.com>.
Thanks Yuki, have created a JIRA ticket
https://issues.apache.org/jira/browse/CASSANDRA-8924
On Thu, Mar 5, 2015 at 10:34 AM, Yuki Morishita <mo...@gmail.com> wrote:
> Thanks.
> It looks like a bug. Can you create a ticket on JIRA?
>
> https://issues.apache.org/jira/browse/CASSANDRA
>
> On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla
> <ab...@envisagesystems.com> wrote:
> > Hi Yuki
> >
> > Thanks for the reply!
> >
> > Here is the log from Cassandra server for the stream failure
> >
> > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
> > StreamResultFuture.java:109 - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508
> > ID#0] Creating new streaming plan for Bulk Load
> > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
> > StreamResultFuture.java:116 - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508,
> > ID#0] Received streaming plan for Bulk Load
> > INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819
> > StreamResultFuture.java:116 - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508,
> > ID#0] Received streaming plan for Bulk Load
> > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822
> > StreamResultFuture.java:166 - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508
> > ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0
> files(0
> > bytes)
> > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823
> StreamSession.java:597
> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following
> > error
> > java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped
> > during streaming
> > at
> >
> org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71)
> > ~[main/:na]
> > at
> >
> org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48)
> > [main/:na]
> > at
> >
> org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38)
> > [main/:na]
> > at
> >
> org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55)
> > [main/:na]
> > at
> >
> org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
> > [main/:na]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> > ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828
> StreamSession.java:477
> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> > java.lang.IllegalArgumentException: Unknown type 0
> > at
> >
> org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89)
> > ~[main/:na]
> > at
> >
> org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
> > ~[main/:na]
> > at
> >
> org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
> > ~[main/:na]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
> > StreamResultFuture.java:180 - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508]
> > Session with /127.0.0.1 is complete
> > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
> > StreamResultFuture.java:207 - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508]
> > Stream failed
> >
> >
> >
> > On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mo...@gmail.com>
> wrote:
> >>
> >> Do you have corresponding error in the other side of the stream
> >> (/192.168.56.11)?
> >>
> >>
> >> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
> >> <ab...@envisagesystems.com> wrote:
> >> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk
> load
> >> > data
> >> > into Cassandra. Was not able to find any documentation of this new
> >> > output
> >> > format , but from looking through the code this uses CQLSSTableWriter
> to
> >> > write SSTable files to disk , which are then streamed to Cassandra
> using
> >> > SSTableLoader. On running the Hadoop job, I can see that the SSTable
> >> > files
> >> > do get generated but fails to stream the data out. I get the same
> >> > exception
> >> > when I try with Cassndra node on "localhost" as well as a remote
> >> > Cassandra
> >> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and
> >> > 2.1.3.
> >> >
> >> > Relevant portion of logs and stack trace
> >> >
> >> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary -
> JNA
> >> > link
> >> > failure, one or more native method will be unavailable.
> >> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary -
> JNA
> >> > link
> >> > failure details: Error looking up function 'posix_fadvise':
> >> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
> >> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> >> > Renaming
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
> >> > to
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
> >> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
> >> > metadata
> >> > for
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> >> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> >> > (617874 bytes)
> >> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture -
> >> > [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for
> Bulk
> >> > Load
> >> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO
> >> > o.a.c.streaming.StreamSession - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to
> >> > /192.168.56.11
> >> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for
> incoming
> >> > stream
> >> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for
> outgoing
> >> > stream
> >> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1
> >> > files}
> >> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO
> >> > o.a.c.streaming.StreamResultFuture - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed.
> Receiving
> >> > 0
> >> > files(0 bytes), sending 1 files(617874 bytes)
> >> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO
> >> > o.a.c.streaming.StreamCoordinator - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session
> >> > with
> >> > /192.168.56.11
> >> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
> >> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:
> >> > 3072,
> >> > transfer size: 617874, compressed?: true, repairedAt: 0), file:
> >> >
> >> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
> >> > (d6d35793-729c-3cab-bee0-84e971e48675, #0)
> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
> >> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
> >> > o.a.c.streaming.ConnectionHandler - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection
> handler
> >> > on
> >> > /192.168.56.11
> >> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
> >> > o.a.c.streaming.StreamResultFuture - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
> >> > complete
> >> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
> >> > o.a.c.streaming.StreamResultFuture - [Stream
> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
> >> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
> >> > o.a.c.streaming.StreamSession
> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
> >> > occurred
> >> > java.io.IOException: Broken pipe
> >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
> >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> >> > ~[na:1.7.0_51]
> >> > at
> >> >
> >> >
> sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
> >> > ~[na:1.7.0_51]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> >> > [cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
> >> > [cassandra-all-2.1.2.jar:2.1.2]
> >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> >> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
> >> > o.a.c.streaming.StreamSession
> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
> >> > occurred
> >> > java.io.IOException: Broken pipe
> >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> >> > ~[na:1.7.0_51]
> >> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
> >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> >> > ~[na:1.7.0_51]
> >> > at
> >> >
> >> >
> org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> >> > [cassandra-all-2.1.2.jar:2.1.2]
> >> > at
> >> >
> >> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
> >> > [cassandra-all-2.1.2.jar:2.1.2]
> >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> >> >
> >> > ----------------------------------------------------
> >> >
> >> > Here is what I have tried
> >> >
> >> >
> >> > Hadoop Driver
> >> >
> >> > public class CassandraBulkImporter extends Configured implements Tool{
> >> >
> >> > .....
> >> >
> >> > public static void main(String[] args) throws Exception {
> >> > int exitCode = ToolRunner.run(new
> >> > CassandraBulkImporter(), args);
> >> > System.exit(exitCode);
> >> > }
> >> >
> >> > @Override
> >> > public int run(String[] arg0) throws Exception {
> >> > .......
> >> > Job job = new Job(conf);
> >> > ......
> >> >
> >> > job.setOutputFormatClass(CqlBulkOutputFormat.class);
> >> >
> >> >
> >> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> >> > "192.168.56.11");
> >> >
> >> > ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> >> > "Murmur3Partitioner");
> >> >
> >> > ConfigHelper.setOutputRpcPort(job.getConfiguration(),
> >> > "9160");
> >> >
> >> > ConfigHelper.setOutputKeyspace(job.getConfiguration(),
> >> > CASSANDRA_KEYSPACE_NAME);
> >> > ConfigHelper.setOutputColumnFamily(
> >> > job.getConfiguration(),
> >> > CASSANDRA_KEYSPACE_NAME,
> >> > CASSANDRA_TABLE_NAME
> >> > );
> >> > //Set the properties for CqlBulkOutputFormat
> >> > MultipleOutputs.addNamedOutput(job,
> >> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class,
> >> > List.class);
> >> >
> >> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
> >> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
> >> >
> >> >
> >> >
> CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
> >> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
> >> > (?,?,?,?,?) ");
> >> >
> >> > .....
> >> > }
> >> >
> >> > }
> >> >
> >> > Reducer Code
> >> >
> >> > public class ReducerToCassandra extends Reducer<Text, Text, Object,
> >> > List<ByteBuffer>> {
> >> >
> >> > private MultipleOutputs multipleOutputs;
> >> >
> >> > @SuppressWarnings("unchecked")
> >> > protected void setup(Context context) throws IOException,
> >> > InterruptedException {
> >> > multipleOutputs = new MultipleOutputs(context);
> >> > }
> >> >
> >> > @Override
> >> > public void reduce(Text id, Iterable<Text> pInfo, Context context)
> >> > throws
> >> > IOException, InterruptedException {
> >> > ....
> >> > List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
> >> >
> >> > .....
> >> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
> >> >
> >> > }
> >> >
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Yuki Morishita
> >> t:yukim (http://twitter.com/yukim)
> >
> >
>
>
>
> --
> Yuki Morishita
> t:yukim (http://twitter.com/yukim)
>
Re: Streaming failures during bulkloading data using CqlBulkOutputFormat
Posted by Yuki Morishita <mo...@gmail.com>.
Thanks.
It looks like a bug. Can you create a ticket on JIRA?
https://issues.apache.org/jira/browse/CASSANDRA
On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla
<ab...@envisagesystems.com> wrote:
> Hi Yuki
>
> Thanks for the reply!
>
> Here is the log from Cassandra server for the stream failure
>
> INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
> StreamResultFuture.java:109 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508
> ID#0] Creating new streaming plan for Bulk Load
> INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
> StreamResultFuture.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508,
> ID#0] Received streaming plan for Bulk Load
> INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819
> StreamResultFuture.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508,
> ID#0] Received streaming plan for Bulk Load
> INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822
> StreamResultFuture.java:166 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508
> ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 files(0
> bytes)
> WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 StreamSession.java:597
> - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following
> error
> java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped
> during streaming
> at
> org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71)
> ~[main/:na]
> at
> org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48)
> [main/:na]
> at
> org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38)
> [main/:na]
> at
> org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55)
> [main/:na]
> at
> org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
> [main/:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 StreamSession.java:477
> - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> java.lang.IllegalArgumentException: Unknown type 0
> at
> org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89)
> ~[main/:na]
> at
> org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
> ~[main/:na]
> at
> org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
> ~[main/:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
> StreamResultFuture.java:180 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508]
> Session with /127.0.0.1 is complete
> WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
> StreamResultFuture.java:207 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508]
> Stream failed
>
>
>
> On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mo...@gmail.com> wrote:
>>
>> Do you have corresponding error in the other side of the stream
>> (/192.168.56.11)?
>>
>>
>> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
>> <ab...@envisagesystems.com> wrote:
>> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load
>> > data
>> > into Cassandra. Was not able to find any documentation of this new
>> > output
>> > format , but from looking through the code this uses CQLSSTableWriter to
>> > write SSTable files to disk , which are then streamed to Cassandra using
>> > SSTableLoader. On running the Hadoop job, I can see that the SSTable
>> > files
>> > do get generated but fails to stream the data out. I get the same
>> > exception
>> > when I try with Cassndra node on "localhost" as well as a remote
>> > Cassandra
>> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and
>> > 2.1.3.
>> >
>> > Relevant portion of logs and stack trace
>> >
>> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - JNA
>> > link
>> > failure, one or more native method will be unavailable.
>> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA
>> > link
>> > failure details: Error looking up function 'posix_fadvise':
>> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
>> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
>> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
>> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
>> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
>> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
>> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
>> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
>> > Renaming
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
>> > to
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
>> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
>> > metadata
>> > for
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
>> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
>> > (617874 bytes)
>> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture -
>> > [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk
>> > Load
>> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO
>> > o.a.c.streaming.StreamSession - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to
>> > /192.168.56.11
>> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming
>> > stream
>> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing
>> > stream
>> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1
>> > files}
>> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO
>> > o.a.c.streaming.StreamResultFuture - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving
>> > 0
>> > files(0 bytes), sending 1 files(617874 bytes)
>> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO
>> > o.a.c.streaming.StreamCoordinator - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session
>> > with
>> > /192.168.56.11
>> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
>> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:
>> > 3072,
>> > transfer size: 617874, compressed?: true, repairedAt: 0), file:
>> >
>> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
>> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
>> > (d6d35793-729c-3cab-bee0-84e971e48675, #0)
>> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
>> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
>> > o.a.c.streaming.ConnectionHandler - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler
>> > on
>> > /192.168.56.11
>> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
>> > o.a.c.streaming.StreamResultFuture - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
>> > complete
>> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
>> > o.a.c.streaming.StreamResultFuture - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
>> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
>> > o.a.c.streaming.StreamSession
>> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> > occurred
>> > java.io.IOException: Broken pipe
>> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
>> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>> > ~[na:1.7.0_51]
>> > at
>> >
>> > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
>> > ~[na:1.7.0_51]
>> > at
>> >
>> > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
>> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
>> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
>> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
>> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
>> > [cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
>> > [cassandra-all-2.1.2.jar:2.1.2]
>> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
>> > o.a.c.streaming.StreamSession
>> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> > occurred
>> > java.io.IOException: Broken pipe
>> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>> > ~[na:1.7.0_51]
>> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
>> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>> > ~[na:1.7.0_51]
>> > at
>> >
>> > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
>> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
>> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
>> > [cassandra-all-2.1.2.jar:2.1.2]
>> > at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
>> > [cassandra-all-2.1.2.jar:2.1.2]
>> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> >
>> > ----------------------------------------------------
>> >
>> > Here is what I have tried
>> >
>> >
>> > Hadoop Driver
>> >
>> > public class CassandraBulkImporter extends Configured implements Tool{
>> >
>> > .....
>> >
>> > public static void main(String[] args) throws Exception {
>> > int exitCode = ToolRunner.run(new
>> > CassandraBulkImporter(), args);
>> > System.exit(exitCode);
>> > }
>> >
>> > @Override
>> > public int run(String[] arg0) throws Exception {
>> > .......
>> > Job job = new Job(conf);
>> > ......
>> >
>> > job.setOutputFormatClass(CqlBulkOutputFormat.class);
>> >
>> >
>> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> > "192.168.56.11");
>> >
>> > ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> > "Murmur3Partitioner");
>> >
>> > ConfigHelper.setOutputRpcPort(job.getConfiguration(),
>> > "9160");
>> >
>> > ConfigHelper.setOutputKeyspace(job.getConfiguration(),
>> > CASSANDRA_KEYSPACE_NAME);
>> > ConfigHelper.setOutputColumnFamily(
>> > job.getConfiguration(),
>> > CASSANDRA_KEYSPACE_NAME,
>> > CASSANDRA_TABLE_NAME
>> > );
>> > //Set the properties for CqlBulkOutputFormat
>> > MultipleOutputs.addNamedOutput(job,
>> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class,
>> > List.class);
>> >
>> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
>> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
>> >
>> >
>> > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
>> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
>> > (?,?,?,?,?) ");
>> >
>> > .....
>> > }
>> >
>> > }
>> >
>> > Reducer Code
>> >
>> > public class ReducerToCassandra extends Reducer<Text, Text, Object,
>> > List<ByteBuffer>> {
>> >
>> > private MultipleOutputs multipleOutputs;
>> >
>> > @SuppressWarnings("unchecked")
>> > protected void setup(Context context) throws IOException,
>> > InterruptedException {
>> > multipleOutputs = new MultipleOutputs(context);
>> > }
>> >
>> > @Override
>> > public void reduce(Text id, Iterable<Text> pInfo, Context context)
>> > throws
>> > IOException, InterruptedException {
>> > ....
>> > List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
>> >
>> > .....
>> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
>> >
>> > }
>> >
>> >
>> >
>>
>>
>>
>> --
>> Yuki Morishita
>> t:yukim (http://twitter.com/yukim)
>
>
--
Yuki Morishita
t:yukim (http://twitter.com/yukim)
Re: Streaming failures during bulkloading data using CqlBulkOutputFormat
Posted by Aby Kuruvilla <ab...@envisagesystems.com>.
Hi Yuki
Thanks for the reply!
Here is the log from Cassandra server for the stream failure
INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
StreamResultFuture.java:109 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508
ID#0] Creating new streaming plan for Bulk Load
INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
StreamResultFuture.java:116 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streaming plan for
Bulk Load
INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819
StreamResultFuture.java:116 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streaming plan for
Bulk Load
INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822
StreamResultFuture.java:166 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508
ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 files(0
bytes)
WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 StreamSession.java:597
- [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following
error
java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped
during streaming
at
org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71)
~[main/:na]
at
org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48)
[main/:na]
at
org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38)
[main/:na]
at
org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55)
[main/:na]
at
org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
[main/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 StreamSession.java:477
- [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
java.lang.IllegalArgumentException: Unknown type 0
at
org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89)
~[main/:na]
at
org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
~[main/:na]
at
org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
~[main/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
StreamResultFuture.java:180 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Session with /127.0.0.1 is complete
WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
StreamResultFuture.java:207 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mo...@gmail.com> wrote:
> Do you have corresponding error in the other side of the stream
> (/192.168.56.11)?
>
>
> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
> <ab...@envisagesystems.com> wrote:
> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load
> data
> > into Cassandra. Was not able to find any documentation of this new
> output
> > format , but from looking through the code this uses CQLSSTableWriter to
> > write SSTable files to disk , which are then streamed to Cassandra using
> > SSTableLoader. On running the Hadoop job, I can see that the SSTable
> files
> > do get generated but fails to stream the data out. I get the same
> exception
> > when I try with Cassndra node on "localhost" as well as a remote
> Cassandra
> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and
> 2.1.3.
> >
> > Relevant portion of logs and stack trace
> >
> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - JNA
> link
> > failure, one or more native method will be unavailable.
> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA
> link
> > failure details: Error looking up function 'posix_fadvise':
> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
> metadata
> > for
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> > (617874 bytes)
> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture -
> [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk
> > Load
> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO
> > o.a.c.streaming.StreamSession - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to /
> 192.168.56.11
> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming
> > stream
> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing
> > stream
> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1
> > files}
> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO
> > o.a.c.streaming.StreamResultFuture - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving
> 0
> > files(0 bytes), sending 1 files(617874 bytes)
> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO
> > o.a.c.streaming.StreamCoordinator - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session
> with
> > /192.168.56.11
> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:
> 3072,
> > transfer size: 617874, compressed?: true, repairedAt: 0), file:
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
> > (d6d35793-729c-3cab-bee0-84e971e48675, #0)
> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler
> on
> > /192.168.56.11
> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
> > o.a.c.streaming.StreamResultFuture - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
> > complete
> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
> > o.a.c.streaming.StreamResultFuture - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
> o.a.c.streaming.StreamSession
> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> > java.io.IOException: Broken pipe
> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> > ~[na:1.7.0_51]
> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> > ~[na:1.7.0_51]
> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> > ~[na:1.7.0_51]
> > at
> >
> sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
> > ~[na:1.7.0_51]
> > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
> > ~[na:1.7.0_51]
> > at
> >
> org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> > [cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
> > [cassandra-all-2.1.2.jar:2.1.2]
> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
> o.a.c.streaming.StreamSession
> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> > java.io.IOException: Broken pipe
> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> > ~[na:1.7.0_51]
> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> > ~[na:1.7.0_51]
> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> > ~[na:1.7.0_51]
> > at
> >
> org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> > [cassandra-all-2.1.2.jar:2.1.2]
> > at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
> > [cassandra-all-2.1.2.jar:2.1.2]
> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> >
> > ----------------------------------------------------
> >
> > Here is what I have tried
> >
> >
> > Hadoop Driver
> >
> > public class CassandraBulkImporter extends Configured implements Tool{
> >
> > .....
> >
> > public static void main(String[] args) throws Exception {
> > int exitCode = ToolRunner.run(new
> > CassandraBulkImporter(), args);
> > System.exit(exitCode);
> > }
> >
> > @Override
> > public int run(String[] arg0) throws Exception {
> > .......
> > Job job = new Job(conf);
> > ......
> >
> job.setOutputFormatClass(CqlBulkOutputFormat.class);
> >
> >
> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> > "192.168.56.11");
> >
> > ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> > "Murmur3Partitioner");
> >
> ConfigHelper.setOutputRpcPort(job.getConfiguration(),
> > "9160");
> >
> > ConfigHelper.setOutputKeyspace(job.getConfiguration(),
> > CASSANDRA_KEYSPACE_NAME);
> > ConfigHelper.setOutputColumnFamily(
> > job.getConfiguration(),
> > CASSANDRA_KEYSPACE_NAME,
> > CASSANDRA_TABLE_NAME
> > );
> > //Set the properties for CqlBulkOutputFormat
> > MultipleOutputs.addNamedOutput(job,
> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class,
> List.class);
> >
> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
> >
> >
> CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
> > (?,?,?,?,?) ");
> >
> > .....
> > }
> >
> > }
> >
> > Reducer Code
> >
> > public class ReducerToCassandra extends Reducer<Text, Text, Object,
> > List<ByteBuffer>> {
> >
> > private MultipleOutputs multipleOutputs;
> >
> > @SuppressWarnings("unchecked")
> > protected void setup(Context context) throws IOException,
> > InterruptedException {
> > multipleOutputs = new MultipleOutputs(context);
> > }
> >
> > @Override
> > public void reduce(Text id, Iterable<Text> pInfo, Context context)
> throws
> > IOException, InterruptedException {
> > ....
> > List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
> >
> > .....
> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
> >
> > }
> >
> >
> >
>
>
>
> --
> Yuki Morishita
> t:yukim (http://twitter.com/yukim)
>
Re: Streaming failures during bulkloading data using CqlBulkOutputFormat
Posted by Yuki Morishita <mo...@gmail.com>.
Do you have corresponding error in the other side of the stream
(/192.168.56.11)?
On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
<ab...@envisagesystems.com> wrote:
> I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load data
> into Cassandra. Was not able to find any documentation of this new output
> format , but from looking through the code this uses CQLSSTableWriter to
> write SSTable files to disk , which are then streamed to Cassandra using
> SSTableLoader. On running the Hadoop job, I can see that the SSTable files
> do get generated but fails to stream the data out. I get the same exception
> when I try with Cassndra node on "localhost" as well as a remote Cassandra
> cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and 2.1.3.
>
> Relevant portion of logs and stack trace
>
> 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - JNA link
> failure, one or more native method will be unavailable.
> 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA link
> failure details: Error looking up function 'posix_fadvise':
> dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
> 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
> 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
> 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
> 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load metadata
> for
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> (617874 bytes)
> 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk
> Load
> 09:20:23.781 [StreamConnectionEstablisher:1] INFO
> o.a.c.streaming.StreamSession - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to /192.168.56.11
> 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming
> stream
> 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing
> stream
> 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1
> files}
> 09:20:23.795 [StreamConnectionEstablisher:1] INFO
> o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving 0
> files(0 bytes), sending 1 files(617874 bytes)
> 09:20:23.799 [StreamConnectionEstablisher:1] INFO
> o.a.c.streaming.StreamCoordinator - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session with
> /192.168.56.11
> 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
> d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys: 3072,
> transfer size: 617874, compressed?: true, repairedAt: 0), file:
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
> 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
> (d6d35793-729c-3cab-bee0-84e971e48675, #0)
> 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
> 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler on
> /192.168.56.11
> 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
> o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
> complete
> 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
> o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
> 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR o.a.c.streaming.StreamSession
> - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> ~[na:1.7.0_51]
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> ~[na:1.7.0_51]
> at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> ~[na:1.7.0_51]
> at
> sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
> ~[na:1.7.0_51]
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
> ~[na:1.7.0_51]
> at
> org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
> ~[cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
> ~[cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
> ~[cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
> ~[cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> [cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
> [cassandra-all-2.1.2.jar:2.1.2]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR o.a.c.streaming.StreamSession
> - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> ~[na:1.7.0_51]
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> ~[na:1.7.0_51]
> at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> ~[na:1.7.0_51]
> at
> org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
> ~[cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
> ~[cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> [cassandra-all-2.1.2.jar:2.1.2]
> at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
> [cassandra-all-2.1.2.jar:2.1.2]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>
> ----------------------------------------------------
>
> Here is what I have tried
>
>
> Hadoop Driver
>
> public class CassandraBulkImporter extends Configured implements Tool{
>
> .....
>
> public static void main(String[] args) throws Exception {
> int exitCode = ToolRunner.run(new
> CassandraBulkImporter(), args);
> System.exit(exitCode);
> }
>
> @Override
> public int run(String[] arg0) throws Exception {
> .......
> Job job = new Job(conf);
> ......
> job.setOutputFormatClass(CqlBulkOutputFormat.class);
>
>
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> "192.168.56.11");
>
> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
> ConfigHelper.setOutputRpcPort(job.getConfiguration(),
> "9160");
>
> ConfigHelper.setOutputKeyspace(job.getConfiguration(),
> CASSANDRA_KEYSPACE_NAME);
> ConfigHelper.setOutputColumnFamily(
> job.getConfiguration(),
> CASSANDRA_KEYSPACE_NAME,
> CASSANDRA_TABLE_NAME
> );
> //Set the properties for CqlBulkOutputFormat
> MultipleOutputs.addNamedOutput(job,
> CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, List.class);
>
> CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
> CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
>
> CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
> CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
> (?,?,?,?,?) ");
>
> .....
> }
>
> }
>
> Reducer Code
>
> public class ReducerToCassandra extends Reducer<Text, Text, Object,
> List<ByteBuffer>> {
>
> private MultipleOutputs multipleOutputs;
>
> @SuppressWarnings("unchecked")
> protected void setup(Context context) throws IOException,
> InterruptedException {
> multipleOutputs = new MultipleOutputs(context);
> }
>
> @Override
> public void reduce(Text id, Iterable<Text> pInfo, Context context) throws
> IOException, InterruptedException {
> ....
> List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
>
> .....
> multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
>
> }
>
>
>
--
Yuki Morishita
t:yukim (http://twitter.com/yukim)