You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Gijs Stuurman <gi...@millmobile.com> on 2012/08/10 14:45:24 UTC

Problem with building Cascading tap for Cassandra

Hi all,

I'm trying to build a Cascading tap for Cassandra. Cascading is a
layer on top of Hadoop. For this purpose I use ColumnFamilyInputFormat and
ColumnFamilyRecordReader from Cassandra.

I ran into a problem that the record reader would create an endless
iterator because something goes wrong with the starttoken of the
batches the ColumnFamilyRecordReader gets out of Cassandra.

In this comment on an issue Jira this situation is
explained:https://issues.apache.org/jira/browse/CASSANDRA-4229

The reply on the issue that the behavior is caused by some keys of a
row being modified. The suggested solution is to copy all the
bytebuffers that are used.

I have added ByteBufferUtil.clone liberally, put the problem persists.

Any suggestions on what might be causing this?

Below the two files that make up the Cascading tap, these use the
ColumnFamilyInputFormat and ColumnFamilyRecordReader from Cassandra
version 1.1.2:

CassandraScheme.java
package cascalog.cassandra;

import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.fs.Path;

import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.SortedMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.nio.ByteBuffer;

import org.apache.cassandra.db.IColumn;

public class CassandraScheme
    extends Scheme<JobConf, RecordReader, OutputCollector, Object[],
Object[]> {

    private String pathUUID;
    private String host;
    private String port;
    private String keyspace;
    private String columnFamily;
    private List<String> columnFieldNames;


    public CassandraScheme(String host, String port, String keyspace,
String columnFamily, List<String> columnFieldNames) {

      this.host = host;
      this.port = port;
      this.keyspace = keyspace;
      this.columnFamily = columnFamily;
      this.columnFieldNames = columnFieldNames;

      this.pathUUID = UUID.randomUUID().toString();
      //setSourceFields(new Fields("text3")); // default is unknown
      //setSinkFields
  }

  @Override
  public void sourcePrepare(FlowProcess<JobConf> flowProcess,
      SourceCall<Object[], RecordReader> sourceCall) {

      ByteBuffer key =
ByteBufferUtil.clone((ByteBuffer)sourceCall.getInput().createKey());
      SortedMap<ByteBuffer, IColumn> value = (SortedMap<ByteBuffer,
IColumn>)sourceCall.getInput().createValue();

      Object[] obj = new Object[]{key, value};

    sourceCall.setContext(obj);
  }

  @Override
  public void sourceCleanup(FlowProcess<JobConf> flowProcess,
      SourceCall<Object[], RecordReader> sourceCall) {
      sourceCall.setContext(null);
  }

  @Override
  public boolean source(FlowProcess<JobConf> flowProcess,
      SourceCall<Object[], RecordReader> sourceCall) throws IOException {
    Tuple result = new Tuple();

    Object key = sourceCall.getContext()[0];
    Object value = sourceCall.getContext()[1];

    boolean hasNext = sourceCall.getInput().next(key, value);
    if (!hasNext) { return false; }

    ByteBuffer orgkey = (ByteBuffer)key;
    ByteBuffer rowkey = ByteBufferUtil.clone(orgkey);

    SortedMap<ByteBuffer, IColumn> columns = (SortedMap<ByteBuffer,
IColumn>) value;

    String rowkey_str = ByteBufferUtil.string(rowkey);

    result.add(rowkey_str);

    for (String columnFieldName: columnFieldNames) {
        IColumn col = columns.get(ByteBufferUtil.bytes(columnFieldName));
        if (col != null) {

result.add(ByteBufferUtil.string(ByteBufferUtil.clone(col.value())));
        } else {
            result.add(null);
        }
    }
    sourceCall.getIncomingEntry().setTuple(result);
    return true;

  }


  @Override
  public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[],
OutputCollector> sinkCall)
      throws IOException {
      System.out.println("sink");
    TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
    OutputCollector outputCollector = sinkCall.getOutput();
    throw new UnsupportedOperationException("TODO");
    //outputCollector.collect(null, put);
  }

  @Override
  public void sinkConfInit(FlowProcess<JobConf> process,
      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
      System.out.println("sinkConfInit");
  }

  @Override
  public void sourceConfInit(FlowProcess<JobConf> process,
      Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {

      FileInputFormat.addInputPaths(conf, getPath().toString());
      conf.setInputFormat(ColumnFamilyInputFormat.class);

      ConfigHelper.setRangeBatchSize(conf, 100);
      ConfigHelper.setInputSplitSize(conf, 30);
      ConfigHelper.setInputRpcPort(conf, port);
      ConfigHelper.setInputInitialAddress(conf, host);
      ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
      ConfigHelper.setInputColumnFamily(conf, keyspace, columnFamily);

      List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>();
      for (String columnFieldName: columnFieldNames) {
          columnNames.add(ByteBufferUtil.bytes(columnFieldName));
      }

      SlicePredicate predicate = new
SlicePredicate().setColumn_names(columnNames);
      ConfigHelper.setInputSlicePredicate(conf, predicate);

  }

  public Path getPath() {
        return new Path(pathUUID);
  }

  public String getIdentifier(){
      return host + "_" + port + "_" + keyspace + "_" + columnFamily;
  }

  @Override
      public boolean equals( Object other ) {
      if( this == other )
          return true;
      if( !( other instanceof CassandraScheme ) )
          return false;
      if( !super.equals( other ) )
          return false;

      CassandraScheme that = (CassandraScheme) other;

      if(!getPath().toString().equals(that.getPath().toString()))
          return false;

      return true;
  }

  @Override
      public int hashCode() {
      int result = super.hashCode();
      result = 31 * result + getPath().toString().hashCode();
      result = 31 * result + ( host != null ? host.hashCode() : 0 );
      result = 31 * result + ( port != null ? port.hashCode() : 0 );
      result = 31 * result + ( keyspace != null ? keyspace.hashCode() : 0 );
      result = 31 * result + ( columnFamily != null ?
columnFamily.hashCode() : 0 );
      return result;
  }

}


CassandraTap.java
package cascalog.cassandra;

import cascading.tap.Tap;
import cascading.scheme.Scheme;

import cascading.tap.SinkMode;

import cascading.flow.FlowProcess;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.OutputCollector;

import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;

import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeIterator;
import cascading.tuple.TupleEntryCollector;

import java.io.IOException;

public class CassandraTap extends Tap<JobConf, RecordReader,
OutputCollector> {

    public final String id = "TEMP_ID";

    public CassandraScheme scheme;
    public CassandraTap(CassandraScheme scheme) {

        super(scheme);
        this.scheme = scheme;
    }

  @Override
  public String getIdentifier() {
      return id + "_" + scheme.getIdentifier();
  }

  @Override
      public TupleEntryIterator openForRead(FlowProcess<JobConf>
jobConfFlowProcess, RecordReader recordReader) throws IOException {
      return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this ,
recordReader);
  }

  @Override
  public TupleEntryCollector openForWrite(FlowProcess<JobConf>
jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
      throw new UnsupportedOperationException("TODO");
  }


      @Override
  public boolean createResource(JobConf jobConf) throws IOException {
          // TODO
    return true;
  }


  @Override
  public boolean deleteResource(JobConf jobConf) throws IOException {
      // TODO
    return true;
  }


  @Override
  public boolean resourceExists(JobConf jobConf) throws IOException {
      // TODO check if column-family exists
    return true;
  }


    @Override
  public long getModifiedTime(JobConf jobConf) throws IOException {
        // TODO could read this from tables
    return System.currentTimeMillis(); // currently unable to find last mod
time
                                       // on a table
  }

  @Override
      public boolean isSource()
  {
      return true;

  }
  @Override
      public boolean isSink()
  {
      return false;

  }

        @Override
    public boolean equals(Object other) {
            if( this == other )
                return true;
            if( !( other instanceof CassandraTap ) )
                return false;
            if( !super.equals( other ) )
                return false;

            CassandraTap otherTap = (CassandraTap) other;
            if (!otherTap.getIdentifier().equals(getIdentifier())) return
false;

            return true;
    }

    @Override
    public int hashCode(){
        int result = super.hashCode();
        result = 31 * result + getIdentifier().hashCode();

        return result;
    }
}