You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Gijs Stuurman <gi...@millmobile.com> on 2012/08/10 14:45:24 UTC
Problem with building Cascading tap for Cassandra
Hi all,
I'm trying to build a Cascading tap for Cassandra. Cascading is a
layer on top of Hadoop. For this purpose I use ColumnFamilyInputFormat and
ColumnFamilyRecordReader from Cassandra.
I ran into a problem that the record reader would create an endless
iterator because something goes wrong with the starttoken of the
batches the ColumnFamilyRecordReader gets out of Cassandra.
In this comment on an issue Jira this situation is
explained:https://issues.apache.org/jira/browse/CASSANDRA-4229
The reply on the issue that the behavior is caused by some keys of a
row being modified. The suggested solution is to copy all the
bytebuffers that are used.
I have added ByteBufferUtil.clone liberally, put the problem persists.
Any suggestions on what might be causing this?
Below the two files that make up the Cascading tap, these use the
ColumnFamilyInputFormat and ColumnFamilyRecordReader from Cassandra
version 1.1.2:
CassandraScheme.java
package cascalog.cassandra;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.fs.Path;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.SortedMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.IColumn;
public class CassandraScheme
extends Scheme<JobConf, RecordReader, OutputCollector, Object[],
Object[]> {
private String pathUUID;
private String host;
private String port;
private String keyspace;
private String columnFamily;
private List<String> columnFieldNames;
public CassandraScheme(String host, String port, String keyspace,
String columnFamily, List<String> columnFieldNames) {
this.host = host;
this.port = port;
this.keyspace = keyspace;
this.columnFamily = columnFamily;
this.columnFieldNames = columnFieldNames;
this.pathUUID = UUID.randomUUID().toString();
//setSourceFields(new Fields("text3")); // default is unknown
//setSinkFields
}
@Override
public void sourcePrepare(FlowProcess<JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
ByteBuffer key =
ByteBufferUtil.clone((ByteBuffer)sourceCall.getInput().createKey());
SortedMap<ByteBuffer, IColumn> value = (SortedMap<ByteBuffer,
IColumn>)sourceCall.getInput().createValue();
Object[] obj = new Object[]{key, value};
sourceCall.setContext(obj);
}
@Override
public void sourceCleanup(FlowProcess<JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
sourceCall.setContext(null);
}
@Override
public boolean source(FlowProcess<JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {
Tuple result = new Tuple();
Object key = sourceCall.getContext()[0];
Object value = sourceCall.getContext()[1];
boolean hasNext = sourceCall.getInput().next(key, value);
if (!hasNext) { return false; }
ByteBuffer orgkey = (ByteBuffer)key;
ByteBuffer rowkey = ByteBufferUtil.clone(orgkey);
SortedMap<ByteBuffer, IColumn> columns = (SortedMap<ByteBuffer,
IColumn>) value;
String rowkey_str = ByteBufferUtil.string(rowkey);
result.add(rowkey_str);
for (String columnFieldName: columnFieldNames) {
IColumn col = columns.get(ByteBufferUtil.bytes(columnFieldName));
if (col != null) {
result.add(ByteBufferUtil.string(ByteBufferUtil.clone(col.value())));
} else {
result.add(null);
}
}
sourceCall.getIncomingEntry().setTuple(result);
return true;
}
@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[],
OutputCollector> sinkCall)
throws IOException {
System.out.println("sink");
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
OutputCollector outputCollector = sinkCall.getOutput();
throw new UnsupportedOperationException("TODO");
//outputCollector.collect(null, put);
}
@Override
public void sinkConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
System.out.println("sinkConfInit");
}
@Override
public void sourceConfInit(FlowProcess<JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
FileInputFormat.addInputPaths(conf, getPath().toString());
conf.setInputFormat(ColumnFamilyInputFormat.class);
ConfigHelper.setRangeBatchSize(conf, 100);
ConfigHelper.setInputSplitSize(conf, 30);
ConfigHelper.setInputRpcPort(conf, port);
ConfigHelper.setInputInitialAddress(conf, host);
ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
ConfigHelper.setInputColumnFamily(conf, keyspace, columnFamily);
List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
for (String columnFieldName: columnFieldNames) {
columnNames.add(ByteBufferUtil.bytes(columnFieldName));
}
SlicePredicate predicate = new
SlicePredicate().setColumn_names(columnNames);
ConfigHelper.setInputSlicePredicate(conf, predicate);
}
public Path getPath() {
return new Path(pathUUID);
}
public String getIdentifier(){
return host + "_" + port + "_" + keyspace + "_" + columnFamily;
}
@Override
public boolean equals( Object other ) {
if( this == other )
return true;
if( !( other instanceof CassandraScheme ) )
return false;
if( !super.equals( other ) )
return false;
CassandraScheme that = (CassandraScheme) other;
if(!getPath().toString().equals(that.getPath().toString()))
return false;
return true;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + getPath().toString().hashCode();
result = 31 * result + ( host != null ? host.hashCode() : 0 );
result = 31 * result + ( port != null ? port.hashCode() : 0 );
result = 31 * result + ( keyspace != null ? keyspace.hashCode() : 0 );
result = 31 * result + ( columnFamily != null ?
columnFamily.hashCode() : 0 );
return result;
}
}
CassandraTap.java
package cascalog.cassandra;
import cascading.tap.Tap;
import cascading.scheme.Scheme;
import cascading.tap.SinkMode;
import cascading.flow.FlowProcess;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.OutputCollector;
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;
import java.io.IOException;
public class CassandraTap extends Tap<JobConf, RecordReader,
OutputCollector> {
public final String id = "TEMP_ID";
public CassandraScheme scheme;
public CassandraTap(CassandraScheme scheme) {
super(scheme);
this.scheme = scheme;
}
@Override
public String getIdentifier() {
return id + "_" + scheme.getIdentifier();
}
@Override
public TupleEntryIterator openForRead(FlowProcess<JobConf>
jobConfFlowProcess, RecordReader recordReader) throws IOException {
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this ,
recordReader);
}
@Override
public TupleEntryCollector openForWrite(FlowProcess<JobConf>
jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
throw new UnsupportedOperationException("TODO");
}
@Override
public boolean createResource(JobConf jobConf) throws IOException {
// TODO
return true;
}
@Override
public boolean deleteResource(JobConf jobConf) throws IOException {
// TODO
return true;
}
@Override
public boolean resourceExists(JobConf jobConf) throws IOException {
// TODO check if column-family exists
return true;
}
@Override
public long getModifiedTime(JobConf jobConf) throws IOException {
// TODO could read this from tables
return System.currentTimeMillis(); // currently unable to find last mod
time
// on a table
}
@Override
public boolean isSource()
{
return true;
}
@Override
public boolean isSink()
{
return false;
}
@Override
public boolean equals(Object other) {
if( this == other )
return true;
if( !( other instanceof CassandraTap ) )
return false;
if( !super.equals( other ) )
return false;
CassandraTap otherTap = (CassandraTap) other;
if (!otherTap.getIdentifier().equals(getIdentifier())) return
false;
return true;
}
@Override
public int hashCode(){
int result = super.hashCode();
result = 31 * result + getIdentifier().hashCode();
return result;
}
}