You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gora.apache.org by Henry Saputra <he...@gmail.com> on 2011/07/22 02:39:16 UTC

Re: svn commit: r1149420 - in /incubator/gora/trunk: gora-cassandra/ivy/ gora-cassandra/lib-ext/ gora-cassandra/src/main/java/org/apache/gora/cassandra/client/ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java

Looks like some files missing Apache license header.

- Henry

On Thu, Jul 21, 2011 at 5:34 PM,  <al...@apache.org> wrote:
> Author: alexis
> Date: Fri Jul 22 00:33:59 2011
> New Revision: 1149420
>
> URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
> Log:
> Cassandra 0.8 backend witch Hector client
>
> Added:
>    incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar   (with props)
>    incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar   (with props)
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
> Removed:
>    incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
>    incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
> Modified:
>    incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
>    incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>
> Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
> +++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
> @@ -22,7 +22,7 @@
>       organisation="org.apache.gora"
>       module="gora-cassandra"
>       status="integration"/>
> -
> +
>   <configurations>
>     <include file="${project.dir}/ivy/ivy-configurations.xml"/>
>   </configurations>
> @@ -32,15 +32,25 @@
>     <artifact name="gora-cassandra-test" conf="test"/>
>   </publications>
>
> +
>   <dependencies>
>     <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
> -    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
> -    <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
> -
> -    <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
> -    <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
> -
> -    <dependency org="com.google.guava" name="guava" rev="r06"/>
> +
> +    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
> +
> +    <dependency org="org.jdom" name="jdom" rev="1.1">
> +       <exclude org="xerces" name="xercesImpl"/>
> +    </dependency>
> +
> +    <!--
> +        <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
> +       <dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
> +    -->
> +    <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
> +    <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
> +    <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
> +    <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
> +    <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
>
>     <!-- test dependencies -->
>
>
> Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
> ==============================================================================
> Binary file - no diff available.
>
> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
> ------------------------------------------------------------------------------
>    svn:mime-type = application/octet-stream
>
> Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
> ==============================================================================
> Binary file - no diff available.
>
> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
> ------------------------------------------------------------------------------
>    svn:mime-type = application/octet-stream
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,41 @@
> +package org.apache.gora.cassandra.query;
> +
> +import org.apache.avro.Schema.Field;
> +
> +
> +/**
> + * Represents a unit of data: a key value pair tagged by a family name
> + */
> +public abstract class CassandraColumn {
> +  public static final int SUB = 0;
> +  public static final int SUPER = 1;
> +
> +  private String family;
> +  private int type;
> +  private Field field;
> +
> +  public String getFamily() {
> +    return family;
> +  }
> +  public void setFamily(String family) {
> +    this.family = family;
> +  }
> +  public int getType() {
> +    return type;
> +  }
> +  public void setType(int type) {
> +    this.type = type;
> +  }
> +  public void setField(Field field) {
> +    this.field = field;
> +  }
> +
> +  protected Field getField() {
> +    return this.field;
> +  }
> +
> +  public abstract String getName();
> +  public abstract Object getValue();
> +
> +
> +}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
> @@ -1,93 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.gora.cassandra.query;
> -
> -import java.io.DataInput;
> -import java.io.DataOutput;
> -import java.io.IOException;
> -
> -import org.apache.gora.persistency.Persistent;
> -import org.apache.gora.query.Query;
> -import org.apache.gora.query.impl.PartitionQueryImpl;
> -import org.apache.hadoop.io.Text;
> -
> -public class CassandraPartitionQuery<K, T extends Persistent>
> -extends PartitionQueryImpl<K, T> {
> -
> -  private String startToken;
> -
> -  private String endToken;
> -
> -  private String[] endPoints;
> -
> -  private int splitSize;
> -
> -  public CassandraPartitionQuery() {
> -    this.dataStore = null;
> -  }
> -
> -  public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
> -      int splitSize) {
> -    super(baseQuery);
> -    this.startToken = startToken;
> -    this.endToken = endToken;
> -    this.endPoints = endPoints;
> -    this.splitSize = splitSize;
> -  }
> -
> -  @Override
> -  public void write(DataOutput out) throws IOException {
> -    super.write(out);
> -    Text.writeString(out, startToken);
> -    Text.writeString(out, endToken);
> -    out.writeInt(endPoints.length);
> -    for (String endPoint : endPoints) {
> -      Text.writeString(out, endPoint);
> -    }
> -    out.writeInt(splitSize);
> -  }
> -
> -  @Override
> -  public void readFields(DataInput in) throws IOException {
> -    super.readFields(in);
> -    startToken = Text.readString(in);
> -    endToken = Text.readString(in);
> -    int size = in.readInt();
> -    endPoints = new String[size];
> -    for (int i = 0; i < size; i++) {
> -      endPoints[i] = Text.readString(in);
> -    }
> -    splitSize = in.readInt();
> -  }
> -
> -  public String getStartToken() {
> -    return startToken;
> -  }
> -
> -  public String getEndToken() {
> -    return endToken;
> -  }
> -
> -  public String[] getEndPoints() {
> -    return endPoints;
> -  }
> -
> -  public int getSplitSize() {
> -    return splitSize;
> -  }
> -}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
> @@ -1,34 +1,55 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.query;
>
> +import java.util.List;
> +import java.util.Map;
> +
>  import org.apache.gora.persistency.Persistent;
> +import org.apache.gora.query.Query;
>  import org.apache.gora.query.impl.QueryBase;
>  import org.apache.gora.store.DataStore;
>
> -public class CassandraQuery<K, T extends Persistent>
> -extends QueryBase<K, T> {
> +public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
>
> +  private Query<K, T> query;
> +
> +  /**
> +   * Maps Avro fields to Cassandra columns.
> +   */
> +  private Map<String, List<String>> familyMap;
> +
>   public CassandraQuery() {
>     super(null);
>   }
> -
>   public CassandraQuery(DataStore<K, T> dataStore) {
>     super(dataStore);
>   }
> +  public void setFamilyMap(Map<String, List<String>> familyMap) {
> +    this.familyMap = familyMap;
> +  }
> +  public Map<String, List<String>> getFamilyMap() {
> +    return familyMap;
> +  }
> +
> +  /**
> +   * @param family the family name
> +   * @return an array of the query column names belonging to the family
> +   */
> +  public String[] getColumns(String family) {
> +
> +    List<String> columnList = familyMap.get(family);
> +    String[] columns = new String[columnList.size()];
> +    for (int i = 0; i < columns.length; ++i) {
> +      columns[i] = columnList.get(i);
> +    }
> +    return columns;
> +  }
> +  public Query<K, T> getQuery() {
> +    return query;
> +  }
> +  public void setQuery(Query<K, T> query) {
> +    this.query = query;
> +  }
> +
> +
> +
>  }
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
> @@ -1,157 +1,97 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.query;
>
>  import java.io.IOException;
> -import java.net.InetAddress;
> -import java.net.UnknownHostException;
> -import java.util.Iterator;
> -
> -import org.apache.gora.cassandra.client.CassandraClient;
> -import org.apache.gora.cassandra.client.Row;
> -import org.apache.gora.cassandra.client.Select;
> -import org.apache.gora.cassandra.store.CassandraStore;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
>  import org.apache.gora.persistency.Persistent;
>  import org.apache.gora.query.Query;
>  import org.apache.gora.query.impl.ResultBase;
>  import org.apache.gora.store.DataStore;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> -public class CassandraResult<K, T extends Persistent>
> -extends ResultBase<K, T> {
> -
> -  private Iterator<Row> rowIter;
> -
> -  private CassandraStore<K, T> store;
> +public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
> +
> +  private int rowNumber;
> +
> +  private CassandraResultSet cassandraResultSet;
> +
> +  /**
> +   * Maps Cassandra columns to Avro fields.
> +   */
> +  private Map<String, String> reverseMap;
>
> -  private String[] fields;
> -
> -  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
> -      int batchRowCount) throws IOException {
> +  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
>     super(dataStore, query);
> -
> -    store = (CassandraStore<K, T>) dataStore;
> -    fields = query.getFields();
> -
> -    boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
> -    String startTokenOrKey;
> -    String endTokenOrKey;
> -
> -    if (isUsingTokens) {
> -      CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
> -      startTokenOrKey = partitionQuery.getStartToken();
> -      endTokenOrKey = partitionQuery.getEndToken();
> -    } else {
> -      CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
> -      startTokenOrKey = cassandraQuery.getStartKey().toString();
> -      endTokenOrKey = cassandraQuery.getEndKey().toString();
> -    }
> -
> -    Select select = store.createSelect(fields);
> -
> -    CassandraClient client = store.getClientByLocation(getLocation(query));
> -    if (isUsingTokens) {
> -      rowIter =
> -        client.getTokenRange(startTokenOrKey, endTokenOrKey,
> -            batchRowCount, select).iterator();
> -    } else {
> -      rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
> -          batchRowCount, select).iterator();
> -    }
> -  }
> -
> -  @Override
> -  public float getProgress() throws IOException {
> -    return 0;
>   }
>
>   @Override
>   protected boolean nextInner() throws IOException {
> -    if (!rowIter.hasNext()) {
> -      return false;
> -    }
> -    Row row = rowIter.next();
> -    if (row == null) {
> -      return false;
> +    if (this.rowNumber < this.cassandraResultSet.size()) {
> +      updatePersistent();
>     }
> -
> -    key = toKey(row.getKey());
> -    persistent = store.newInstance(row, fields);
> -
> -    return true;
> +    ++this.rowNumber;
> +    return (this.rowNumber <= this.cassandraResultSet.size());
>   }
>
> +
> +  /**
> +   * Load key/value pair from Cassandra row to Avro record.
> +   * @throws IOException
> +   */
>   @SuppressWarnings("unchecked")
> -  private K toKey(String keyStr) {
> -    Class<K> keyClass = dataStore.getKeyClass();
> -    if (keyClass.isAssignableFrom(String.class)) {
> -      return (K) keyStr;
> -    }
> -    if (keyClass.isAssignableFrom(Integer.class)) {
> -      return (K) (Integer) Integer.parseInt(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Float.class)) {
> -      return (K) (Float) Float.parseFloat(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Double.class)) {
> -      return (K) (Double) Double.parseDouble(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Long.class)) {
> -      return (K) (Long) Long.parseLong(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Short.class)) {
> -      return (K) (Short) Short.parseShort(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Byte.class)) {
> -      return (K) (Byte) Byte.parseByte(keyStr);
> +  private void updatePersistent() throws IOException {
> +    CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
> +
> +    // load key
> +    this.key = (K) cassandraRow.getKey();
> +
> +    // load value
> +    Schema schema = this.persistent.getSchema();
> +    List<Field> fields = schema.getFields();
> +
> +    for (CassandraColumn cassandraColumn: cassandraRow) {
> +
> +      // get field name
> +      String family = cassandraColumn.getFamily();
> +      String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
> +
> +      // get field
> +      int pos = this.persistent.getFieldIndex(fieldName);
> +      Field field = fields.get(pos);
> +
> +      // get value
> +      cassandraColumn.setField(field);
> +      Object value = cassandraColumn.getValue();
> +
> +      this.persistent.put(pos, value);
> +      // this field does not need to be written back to the store
> +      this.persistent.clearDirty(pos);
>     }
>
> -    throw new RuntimeException("Can't parse " + keyStr +
> -                               " as an instance of " + keyClass);
>   }
>
>   @Override
> -  public void close() throws IOException { }
> +  public void close() throws IOException {
> +    // TODO Auto-generated method stub
> +
> +  }
>
> -  private String getLocation(Query<K, T> query) {
> -    if (!(query instanceof CassandraPartitionQuery)) {
> -      return null;
> -    }
> -    CassandraPartitionQuery<K, T> partitonQuery =
> -      (CassandraPartitionQuery<K, T>) query;
> -    InetAddress[] localAddresses = new InetAddress[0];
> -    try {
> -      localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
> -    } catch (UnknownHostException e) {
> -      throw new AssertionError(e);
> -    }
> -    for (InetAddress address : localAddresses) {
> -      for (String location : partitonQuery.getEndPoints()) {
> -        InetAddress locationAddress = null;
> -        try {
> -          locationAddress = InetAddress.getByName(location);
> -        } catch (UnknownHostException e) {
> -          throw new AssertionError(e);
> -        }
> -        if (address.equals(locationAddress)) {
> -          return location;
> -        }
> -      }
> -    }
> -    return partitonQuery.getEndPoints()[0];
> +  @Override
> +  public float getProgress() throws IOException {
> +    return (((float) this.rowNumber) / this.cassandraResultSet.size());
>   }
> -}
> \ No newline at end of file
> +
> +  public void setResultSet(CassandraResultSet cassandraResultSet) {
> +    this.cassandraResultSet = cassandraResultSet;
> +  }
> +
> +  public void setReverseMap(Map<String, String> reverseMap) {
> +    this.reverseMap = reverseMap;
> +  }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,36 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +
> +/**
> + * List data structure to keep the order coming from the Cassandra selects.
> + */
> +public class CassandraResultSet extends ArrayList<CassandraRow> {
> +
> +  /**
> +   *
> +   */
> +  private static final long serialVersionUID = -7620939600192859652L;
> +
> +  /**
> +   * Maps keys to indices in the list.
> +   */
> +  private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
> +
> +  public CassandraRow getRow(String key) {
> +    Integer integer = this.indexMap.get(key);
> +    if (integer == null) {
> +      return null;
> +    }
> +
> +    return this.get(integer);
> +  }
> +
> +  public void putRow(String key, CassandraRow cassandraRow) {
> +    this.add(cassandraRow);
> +    this.indexMap.put(key, this.size()-1);
> +  }
> +
> +
> +}
> \ No newline at end of file
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,24 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.ArrayList;
> +
> +/**
> + * List of key value pairs representing a row, tagged by a key.
> + */
> +public class CassandraRow extends ArrayList<CassandraColumn> {
> +
> +  /**
> +   *
> +   */
> +  private static final long serialVersionUID = -7620939600192859652L;
> +  private String key;
> +
> +  public String getKey() {
> +    return this.key;
> +  }
> +
> +  public void setKey(String key) {
> +    this.key = key;
> +  }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,104 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.nio.ByteBuffer;
> +import java.nio.CharBuffer;
> +import java.nio.charset.CharacterCodingException;
> +import java.nio.charset.Charset;
> +import java.nio.charset.CharsetEncoder;
> +
> +import me.prettyprint.hector.api.beans.HColumn;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> +import org.apache.avro.Schema.Type;
> +import org.apache.avro.generic.GenericArray;
> +import org.apache.avro.generic.GenericData;
> +import org.apache.avro.util.Utf8;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraSubColumn extends CassandraColumn {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
> +
> +  private static final String ENCODING = "UTF-8";
> +
> +  private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
> +
> +
> +  /**
> +   * Key-value pair containing the raw data.
> +   */
> +  private HColumn<String, String> hColumn;
> +
> +  public String getName() {
> +    return hColumn.getName();
> +  }
> +
> +  /**
> +   * Deserialize a String into an typed Object, according to the field schema.
> +   * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
> +   */
> +  public Object getValue() {
> +    Field field = getField();
> +    Schema fieldSchema = field.schema();
> +    Type type = fieldSchema.getType();
> +    String valueString = hColumn.getValue();
> +    Object value = null;
> +
> +    switch (type) {
> +      case STRING:
> +        value = new Utf8(valueString);
> +        break;
> +      case BYTES:
> +        // convert string to bytebuffer
> +        value = getByteBuffer(valueString);
> +        break;
> +      case INT:
> +        value = Integer.parseInt(valueString);
> +        break;
> +      case LONG:
> +        value = Long.parseLong(valueString);
> +        break;
> +      case FLOAT:
> +        value = Float.parseFloat(valueString);
> +        break;
> +      case ARRAY:
> +        // convert string to array
> +        valueString = valueString.substring(1, valueString.length()-1);
> +        String[] elements = valueString.split(", ");
> +
> +        Type elementType = fieldSchema.getElementType().getType();
> +        if (elementType == Schema.Type.STRING) {
> +          // the array type is String
> +          GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
> +          for (String element: elements) {
> +            genericArray.add(element);
> +          }
> +
> +          value = genericArray;
> +        } else {
> +          LOG.info("Element type not supported: " + elementType);
> +        }
> +        break;
> +      default:
> +        LOG.info("Type not supported: " + type);
> +    }
> +
> +    return value;
> +
> +  }
> +
> +  public void setValue(HColumn<String, String> hColumn) {
> +    this.hColumn = hColumn;
> +  }
> +
> +  public static ByteBuffer getByteBuffer(String valueString) {
> +    ByteBuffer byteBuffer = null;
> +    try {
> +      byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
> +    } catch (CharacterCodingException cce) {
> +      LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
> +    }
> +    return byteBuffer;
> +  }
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,102 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.Map;
> +
> +import me.prettyprint.hector.api.beans.HColumn;
> +import me.prettyprint.hector.api.beans.HSuperColumn;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> +import org.apache.avro.Schema.Type;
> +import org.apache.avro.util.Utf8;
> +import org.apache.gora.persistency.StatefulHashMap;
> +import org.apache.gora.persistency.impl.PersistentBase;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraSuperColumn extends CassandraColumn {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
> +
> +  private HSuperColumn<String, String, String> hSuperColumn;
> +
> +  public String getName() {
> +    return hSuperColumn.getName();
> +  }
> +
> +  public Object getValue() {
> +    Field field = getField();
> +    Schema fieldSchema = field.schema();
> +    Type type = fieldSchema.getType();
> +
> +    Object value = null;
> +
> +    switch (type) {
> +      case MAP:
> +        Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
> +        Type valueType = fieldSchema.getValueType().getType();
> +
> +        for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
> +          String memberString = hColumn.getValue();
> +          Object memberValue = null;
> +          switch (valueType) {
> +            case STRING:
> +              memberValue = new Utf8(memberString);
> +              break;
> +            case BYTES:
> +              memberValue = CassandraSubColumn.getByteBuffer(memberString);
> +              break;
> +            default:
> +              LOG.info("Type for the map value is not supported: " + valueType);
> +
> +          }
> +          map.put(new Utf8(hColumn.getName()), memberValue);
> +        }
> +        value = map;
> +
> +        break;
> +      case RECORD:
> +        String fullName = fieldSchema.getFullName();
> +
> +        Class<?> claz = null;
> +        try {
> +          claz = Class.forName(fullName);
> +        } catch (ClassNotFoundException cnfe) {
> +          LOG.warn("Unable to load class " + fullName, cnfe);
> +          break;
> +        }
> +
> +        try {
> +          value = claz.newInstance();
> +        } catch (InstantiationException ie) {
> +          LOG.warn("Instantiation error", ie);
> +          break;
> +        } catch (IllegalAccessException iae) {
> +          LOG.warn("Illegal access error", iae);
> +          break;
> +        }
> +
> +        // we updated the value instance, now update its members
> +        if (value instanceof PersistentBase) {
> +          PersistentBase record = (PersistentBase) value;
> +
> +          for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
> +            Field memberField = fieldSchema.getField(hColumn.getName());
> +            CassandraSubColumn cassandraColumn = new CassandraSubColumn();
> +            cassandraColumn.setField(memberField);
> +            cassandraColumn.setValue(hColumn);
> +            record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
> +          }
> +        }
> +        break;
> +      default:
> +        LOG.info("Type not supported: " + type);
> +    }
> +
> +    return value;
> +  }
> +
> +  public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
> +    this.hSuperColumn = hSuperColumn;
> +  }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,253 @@
> +package org.apache.gora.cassandra.store;
> +
> +import java.nio.ByteBuffer;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +
> +import me.prettyprint.cassandra.serializers.StringSerializer;
> +import me.prettyprint.cassandra.service.CassandraHostConfigurator;
> +import me.prettyprint.hector.api.Cluster;
> +import me.prettyprint.hector.api.Keyspace;
> +import me.prettyprint.hector.api.Serializer;
> +import me.prettyprint.hector.api.beans.OrderedRows;
> +import me.prettyprint.hector.api.beans.OrderedSuperRows;
> +import me.prettyprint.hector.api.beans.Row;
> +import me.prettyprint.hector.api.beans.SuperRow;
> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
> +import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
> +import me.prettyprint.hector.api.factory.HFactory;
> +import me.prettyprint.hector.api.mutation.Mutator;
> +import me.prettyprint.hector.api.query.QueryResult;
> +import me.prettyprint.hector.api.query.RangeSlicesQuery;
> +import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
> +
> +import org.apache.gora.cassandra.query.CassandraQuery;
> +import org.apache.gora.mapreduce.GoraRecordReader;
> +import org.apache.gora.persistency.Persistent;
> +import org.apache.gora.query.Query;
> +import org.apache.gora.util.ByteUtils;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraClient<K, T extends Persistent> {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
> +
> +  private Cluster cluster;
> +  private Keyspace keyspace;
> +  private Mutator<String> mutator;
> +
> +  private CassandraMapping cassandraMapping = new CassandraMapping();
> +
> +  private Serializer<String> stringSerializer = new StringSerializer();
> +
> +  public void init() throws Exception {
> +    this.cassandraMapping.loadConfiguration();
> +    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
> +
> +    // add keyspace to cluster
> +    checkKeyspace();
> +
> +    // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
> +    this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
> +
> +    this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
> +  }
> +
> +  /**
> +   * Check if keyspace already exists. If not, create it.
> +   */
> +  public void checkKeyspace() {
> +    // "describe keyspace <keyspaceName>;" query
> +    KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
> +    if (keyspaceDefinition == null) {
> +      List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
> +      keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
> +      this.cluster.addKeyspace(keyspaceDefinition);
> +      LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
> +
> +      keyspaceDefinition = null;
> +    }
> +
> +
> +  }
> +
> +  /**
> +   * Drop keyspace.
> +   */
> +  public void dropKeyspace() {
> +    // "drop keyspace <keyspaceName>;" query
> +    this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
> +  }
> +
> +  /**
> +   * Insert a field in a column.
> +   * @param key the row key
> +   * @param fieldName the field name
> +   * @param value the field value.
> +   */
> +  public void addColumn(String key, String fieldName, Object value) {
> +    if (value == null) {
> +      return;
> +    }
> +    if (value instanceof ByteBuffer) {
> +      value = toString((ByteBuffer) value);
> +    }
> +
> +    String columnFamily = this.cassandraMapping.getFamily(fieldName);
> +    String columnName = this.cassandraMapping.getColumn(fieldName);
> +
> +    this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
> +  }
> +
> +  /**
> +   * TODO do no convert bytes to string to store a binary field
> +   * @param value
> +   * @return
> +   */
> +  private static String toString(ByteBuffer value) {
> +    ByteBuffer byteBuffer = (ByteBuffer) value;
> +    return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
> +  }
> +
> +  /**
> +   * Insert a member in a super column. This is used for map and record Avro types.
> +   * @param key the row key
> +   * @param fieldName the field name
> +   * @param memberName the member name
> +   * @param value the member value
> +   */
> +  public void addSubColumn(String key, String fieldName, String memberName, Object value) {
> +    if (value == null) {
> +      return;
> +    }
> +
> +    if (value instanceof ByteBuffer) {
> +      value = toString((ByteBuffer) value);
> +    }
> +
> +    String columnFamily = this.cassandraMapping.getFamily(fieldName);
> +    String superColumnName = this.cassandraMapping.getColumn(fieldName);
> +
> +    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
> +
> +  }
> +
> +  /**
> +   * Select a family column in the keyspace.
> +   * @param cassandraQuery a wrapper of the query
> +   * @param family the family name to be queried
> +   * @return a list of family rows
> +   */
> +  public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
> +
> +    String[] columnNames = cassandraQuery.getColumns(family);
> +    Query<K, T> query = cassandraQuery.getQuery();
> +    int limit = (int) query.getLimit();
> +    String startKey = (String) query.getStartKey();
> +    String endKey = (String) query.getEndKey();
> +
> +    if (startKey == null) {
> +      startKey = "";
> +    }
> +    if (endKey == null) {
> +      endKey = "";
> +    }
> +
> +
> +    RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
> +    rangeSlicesQuery.setColumnFamily(family);
> +    rangeSlicesQuery.setKeys(startKey, endKey);
> +    rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
> +    rangeSlicesQuery.setRowCount(limit);
> +    rangeSlicesQuery.setColumnNames(columnNames);
> +
> +
> +    QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
> +    OrderedRows<String, String, String> orderedRows = queryResult.get();
> +
> +
> +    return orderedRows.getList();
> +  }
> +
> +  /**
> +   * Select the families that contain at least one column mapped to a query field.
> +   * @param query indicates the columns to select
> +   * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
> +   */
> +  public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
> +    Map<String, List<String>> map = new HashMap<String, List<String>>();
> +    for (String field: query.getFields()) {
> +      String family = this.cassandraMapping.getFamily(field);
> +      String column = this.cassandraMapping.getColumn(field);
> +
> +      // check if the family value was already initialized
> +      List<String> list = map.get(family);
> +      if (list == null) {
> +        list = new ArrayList<String>();
> +        map.put(family, list);
> +      }
> +
> +      if (column != null) {
> +        list.add(column);
> +      }
> +
> +    }
> +
> +    return map;
> +  }
> +
> +  /**
> +   * Select the field names according to the column names, which format if fully qualified: "family:column"
> +   * @param query
> +   * @return a map which keys are the fully qualified column names and values the query fields
> +   */
> +  public Map<String, String> getReverseMap(Query<K, T> query) {
> +    Map<String, String> map = new HashMap<String, String>();
> +    for (String field: query.getFields()) {
> +      String family = this.cassandraMapping.getFamily(field);
> +      String column = this.cassandraMapping.getColumn(field);
> +
> +      map.put(family + ":" + column, field);
> +    }
> +
> +    return map;
> +
> +  }
> +
> +  public boolean isSuper(String family) {
> +    return this.cassandraMapping.isSuper(family);
> +  }
> +
> +  public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
> +    String[] columnNames = cassandraQuery.getColumns(family);
> +    Query<K, T> query = cassandraQuery.getQuery();
> +    int limit = (int) query.getLimit();
> +    String startKey = (String) query.getStartKey();
> +    String endKey = (String) query.getEndKey();
> +
> +    if (startKey == null) {
> +      startKey = "";
> +    }
> +    if (endKey == null) {
> +      endKey = "";
> +    }
> +
> +
> +    RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
> +    rangeSuperSlicesQuery.setColumnFamily(family);
> +    rangeSuperSlicesQuery.setKeys(startKey, endKey);
> +    rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
> +    rangeSuperSlicesQuery.setRowCount(limit);
> +    rangeSuperSlicesQuery.setColumnNames(columnNames);
> +
> +
> +    QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
> +    OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
> +    return orderedRows.getList();
> +
> +
> +  }
> +}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
> @@ -1,50 +1,155 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.store;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
>  import java.util.HashMap;
> +import java.util.List;
>  import java.util.Map;
> -import java.util.Set;
> +
> +import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
> +import me.prettyprint.cassandra.service.ThriftCfDef;
> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
> +import me.prettyprint.hector.api.ddl.ColumnType;
> +import me.prettyprint.hector.api.ddl.ComparatorType;
> +
> +import org.jdom.Document;
> +import org.jdom.Element;
> +import org.jdom.JDOMException;
> +import org.jdom.input.SAXBuilder;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
>  public class CassandraMapping {
> +
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
> +
> +  private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
> +  private static final String KEYSPACE_ELEMENT = "keyspace";
> +  private static final String NAME_ATTRIBUTE = "name";
> +  private static final String MAPPING_ELEMENT = "class";
> +  private static final String COLUMN_ATTRIBUTE = "qualifier";
> +  private static final String FAMILY_ATTRIBUTE = "family";
> +  private static final String SUPER_ATTRIBUTE = "type";
> +  private static final String CLUSTER_ATTRIBUTE = "cluster";
> +  private static final String HOST_ATTRIBUTE = "host";
> +
> +
> +  private String hostName;
> +  private String clusterName;
> +  private String keyspaceName;
> +
> +
> +  /**
> +   * List of the super column families.
> +   */
> +  private List<String> superFamilies = new ArrayList<String>();
> +
> +  /**
> +   * Look up the column family associated to the Avro field.
> +   */
> +  private Map<String, String> familyMap = new HashMap<String, String>();
> +
> +  /**
> +   * Look up the column associated to the Avro field.
> +   */
> +  private Map<String, String> columnMap = new HashMap<String, String>();
> +
> +  /**
> +   * Look up the column family from its name.
> +   */
> +  private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
>
> -  private String keySpace;
> +  public String getHostName() {
> +    return this.hostName;
> +  }
>
> -  private Map<String, Boolean> families =
> -    new HashMap<String, Boolean>();
> +  public String getClusterName() {
> +    return this.clusterName;
> +  }
>
> -  public String getKeySpace() {
> -    return keySpace;
> +  public String getKeyspaceName() {
> +    return this.keyspaceName;
>   }
>
> -  public void setKeySpace(String keySpace) {
> -    this.keySpace = keySpace;
> +
> +  @SuppressWarnings("unchecked")
> +  public void loadConfiguration() throws JDOMException, IOException {
> +    SAXBuilder saxBuilder = new SAXBuilder();
> +    Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
> +    Element root = document.getRootElement();
> +
> +    Element keyspace = root.getChild(KEYSPACE_ELEMENT);
> +    this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
> +    this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
> +    this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
> +
> +    // load column family definitions
> +    List<Element> elements = keyspace.getChildren();
> +    for (Element element: elements) {
> +      BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
> +
> +      String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
> +
> +      String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
> +      if (superAttribute != null) {
> +        this.superFamilies.add(familyName);
> +        cfDef.setColumnType(ColumnType.SUPER);
> +        cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
> +      }
> +
> +      cfDef.setKeyspaceName(this.keyspaceName);
> +      cfDef.setName(familyName);
> +      cfDef.setComparatorType(ComparatorType.UTF8TYPE);
> +      cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
> +
> +      this.columnFamilyDefinitions.put(familyName, cfDef);
> +
> +    }
> +
> +    // load column definitions
> +    Element mapping = root.getChild(MAPPING_ELEMENT);
> +    elements = mapping.getChildren();
> +    for (Element element: elements) {
> +      String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
> +      String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
> +      String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
> +      BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
> +      if (columnFamilyDefinition == null) {
> +        LOG.warn("Family " + familyName + " was not declared in the keyspace.");
> +      }
> +
> +      this.familyMap.put(fieldName, familyName);
> +      this.columnMap.put(fieldName, columnName);
> +
> +    }
>   }
>
> -  public Set<String> getColumnFamilies() {
> -    return families.keySet();
> +  public String getFamily(String name) {
> +    return this.familyMap.get(name);
>   }
>
> -  public void addColumnFamily(String columnFamily, boolean isSuper) {
> -    families.put(columnFamily, isSuper);
> +  public String getColumn(String name) {
> +    return this.columnMap.get(name);
>   }
>
> -  public boolean isColumnFamilySuper(String columnFamily) {
> -    return families.get(columnFamily);
> +  /**
> +   * Read family super attribute.
> +   * @param family the family name
> +   * @return true is the family is a super column family
> +   */
> +  public boolean isSuper(String family) {
> +    return this.superFamilies.indexOf(family) != -1;
>   }
> +
> +  public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
> +    List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
> +    for (String key: this.columnFamilyDefinitions.keySet()) {
> +      ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
> +      ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
> +      list.add(thriftCfDef);
> +    }
> +
> +    return list;
> +  }
> +
>  }
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
> @@ -1,465 +1,334 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.store;
>
>  import java.io.IOException;
> -import java.net.InetAddress;
>  import java.util.ArrayList;
>  import java.util.HashMap;
>  import java.util.List;
>  import java.util.Map;
> -import java.util.Map.Entry;
> -import java.util.Properties;
> -import java.util.Set;
> +
> +import me.prettyprint.hector.api.beans.ColumnSlice;
> +import me.prettyprint.hector.api.beans.HColumn;
> +import me.prettyprint.hector.api.beans.HSuperColumn;
> +import me.prettyprint.hector.api.beans.Row;
> +import me.prettyprint.hector.api.beans.SuperRow;
> +import me.prettyprint.hector.api.beans.SuperSlice;
>
>  import org.apache.avro.Schema;
>  import org.apache.avro.Schema.Field;
>  import org.apache.avro.Schema.Type;
>  import org.apache.avro.generic.GenericArray;
>  import org.apache.avro.util.Utf8;
> -import org.apache.cassandra.thrift.TokenRange;
> -import org.apache.gora.cassandra.client.CassandraClient;
> -import org.apache.gora.cassandra.client.Mutate;
> -import org.apache.gora.cassandra.client.Row;
> -import org.apache.gora.cassandra.client.Select;
> -import org.apache.gora.cassandra.client.SimpleCassandraClient;
> -import org.apache.gora.cassandra.query.CassandraPartitionQuery;
>  import org.apache.gora.cassandra.query.CassandraQuery;
>  import org.apache.gora.cassandra.query.CassandraResult;
> -import org.apache.gora.persistency.ListGenericArray;
> +import org.apache.gora.cassandra.query.CassandraResultSet;
> +import org.apache.gora.cassandra.query.CassandraRow;
> +import org.apache.gora.cassandra.query.CassandraSubColumn;
> +import org.apache.gora.cassandra.query.CassandraSuperColumn;
>  import org.apache.gora.persistency.Persistent;
> -import org.apache.gora.persistency.State;
> -import org.apache.gora.persistency.StateManager;
>  import org.apache.gora.persistency.StatefulHashMap;
> -import org.apache.gora.persistency.StatefulMap;
> +import org.apache.gora.persistency.impl.PersistentBase;
> +import org.apache.gora.persistency.impl.StateManagerImpl;
>  import org.apache.gora.query.PartitionQuery;
>  import org.apache.gora.query.Query;
>  import org.apache.gora.query.Result;
> -import org.apache.gora.store.DataStoreFactory;
> +import org.apache.gora.query.impl.PartitionQueryImpl;
>  import org.apache.gora.store.impl.DataStoreBase;
> -import org.apache.gora.util.ByteUtils;
> -import org.jdom.Document;
> -import org.jdom.Element;
> -import org.jdom.input.SAXBuilder;
> -
> -/**
> - * DataStore for Cassandra.
> - *
> - * <p> Note: CassandraStore is not thread-safe. </p>
> - */
> -public class CassandraStore<K, T extends Persistent>
> -extends DataStoreBase<K, T> {
> -
> -  private static final String ERROR_MESSAGE =
> -    "Cassandra does not support creating or modifying ColumnFamilies during runtime";
> -
> -  private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
> -
> -  private static final int SPLIT_SIZE = 65536;
> -
> -  private static final int BATCH_COUNT = 256;
> -
> -  private CassandraClient client;
> -
> -  private Map<String, CassandraColumn> columnMap;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> -  private CassandraMapping mapping;
> -
> -  @Override
> -  public void initialize(Class<K> keyClass, Class<T> persistentClass,
> -      Properties properties) throws IOException {
> -    super.initialize(keyClass, persistentClass, properties);
> -
> -    String mappingFile =
> -      DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
> -
> -    readMapping(mappingFile);
> +public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
> +
> +  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
> +
> +  /**
> +   * The values are Avro fields pending to be stored.
> +   */
> +  private Map<K, T> buffer = new HashMap<K, T>();
> +
> +  public CassandraStore() throws Exception {
> +    this.cassandraClient.init();
>   }
>
>   @Override
> -  public String getSchemaName() {
> -    return mapping.getKeySpace();
> +  public void close() throws IOException {
> +    LOG.debug("close");
> +    flush();
>   }
>
>   @Override
> -  public void createSchema() throws IOException {
> -    throw new UnsupportedOperationException(ERROR_MESSAGE);
> +  public void createSchema() {
> +    LOG.debug("create schema");
> +    this.cassandraClient.checkKeyspace();
>   }
>
>   @Override
> -  public void deleteSchema() throws IOException {
> -    throw new UnsupportedOperationException(ERROR_MESSAGE);
> +  public boolean delete(K key) throws IOException {
> +    LOG.debug("delete " + key);
> +    return false;
>   }
>
>   @Override
> -  public boolean schemaExists() throws IOException {
> -    return true;
> +  public long deleteByQuery(Query<K, T> query) throws IOException {
> +    LOG.debug("delete by query " + query);
> +    return 0;
>   }
>
> -  public CassandraClient getClientByLocation(String endPoint) {
> -    return client;
> +  @Override
> +  public void deleteSchema() throws IOException {
> +    LOG.debug("delete schema");
> +    this.cassandraClient.dropKeyspace();
>   }
>
> -  public Select createSelect(String[] fields) {
> -    Select select = new Select();
> -    if (fields == null) {
> -      fields = beanFactory.getCachedPersistent().getFields();
> -    }
> -    for (String f : fields) {
> -      CassandraColumn col = columnMap.get(f);
> -      Schema fieldSchema = fieldMap.get(f).schema();
> -      switch (fieldSchema.getType()) {
> -        case MAP:
> -        case ARRAY:
> -          if (col.isSuperColumn()) {
> -            select.addAllColumnsForSuperColumn(col.family, col.superColumn);
> -          } else {
> -            select.addColumnAll(col.family);
> -          }
> -          break;
> -        default:
> -          if (col.isSuperColumn()) {
> -            select.addColumnName(col.family, col.superColumn, col.column);
> -          } else {
> -            select.addColumnName(col.family, col.column);
> -          }
> -          break;
> +  @Override
> +  public Result<K, T> execute(Query<K, T> query) throws IOException {
> +
> +    Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
> +    Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
> +
> +    CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
> +    cassandraQuery.setQuery(query);
> +    cassandraQuery.setFamilyMap(familyMap);
> +
> +    CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
> +    cassandraResult.setReverseMap(reverseMap);
> +
> +    CassandraResultSet cassandraResultSet = new CassandraResultSet();
> +
> +    // We query Cassandra keyspace by families.
> +    for (String family : familyMap.keySet()) {
> +      if (this.cassandraClient.isSuper(family)) {
> +        addSuperColumns(family, cassandraQuery, cassandraResultSet);
> +
> +      } else {
> +        addSubColumns(family, cassandraQuery, cassandraResultSet);
> +
>       }
> +
>     }
> -    return select;
> -  }
> +
> +    cassandraResult.setResultSet(cassandraResultSet);
> +
> +
> +    return cassandraResult;
>
> -  @Override
> -  public T get(K key, String[] fields) throws IOException {
> -    if (fields == null) {
> -      fields = beanFactory.getCachedPersistent().getFields();
> -    }
> -    Select select = createSelect(fields);
> -    try {
> -      Row result = client.get(key.toString(), select);
> -      return newInstance(result, fields);
> -    } catch (Exception e) {
> -      throw new IOException(e);
> -    }
>   }
>
> -  @SuppressWarnings("rawtypes")
> -  private void setField(T persistent, Field field, StatefulMap map) {
> -    persistent.put(field.pos(), map);
> -  }
> -
> -  private void setField(T persistent, Field field, byte[] val)
> -  throws IOException {
> -    persistent.put(field.pos()
> -        , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
> -  }
> -
> -  @SuppressWarnings("rawtypes")
> -  private void setField(T persistent, Field field, GenericArray list) {
> -    persistent.put(field.pos(), list);
> -  }
> -
> -  @SuppressWarnings({ "rawtypes", "unchecked" })
> -  public T newInstance(Row result, String[] fields)
> -  throws IOException {
> -    if(result == null)
> -      return null;
> -
> -    T persistent = newPersistent();
> -    StateManager stateManager = persistent.getStateManager();
> -    for (String f : fields) {
> -      CassandraColumn col = columnMap.get(f);
> -      Field field = fieldMap.get(f);
> -      Schema fieldSchema = field.schema();
> -      Map<String, byte[]> qualMap;
> -      switch(fieldSchema.getType()) {
> -        case MAP:
> -          if (col.isSuperColumn()) {
> -            qualMap = result.getSuperColumn(col.family, col.superColumn);
> -          } else {
> -            qualMap = result.getColumn(col.family);
> -          }
> -          if (qualMap == null) {
> -            continue;
> -          }
> -          Schema valueSchema = fieldSchema.getValueType();
> -          StatefulMap map = new StatefulHashMap();
> -          for (Entry<String, byte[]> e : qualMap.entrySet()) {
> -            Utf8 mapKey = new Utf8(e.getKey());
> -            map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
> -            map.putState(mapKey, State.CLEAN);
> -          }
> -          setField(persistent, field, map);
> -          break;
> -        case ARRAY:
> -          if (col.isSuperColumn()) {
> -            qualMap = result.getSuperColumn(col.family, col.superColumn);
> -          } else {
> -            qualMap = result.getColumn(col.family);
> -          }
> -          if (qualMap == null) {
> -            continue;
> -          }
> -          valueSchema = fieldSchema.getElementType();
> -          ArrayList arrayList = new ArrayList();
> -          for (Entry<String, byte[]> e : qualMap.entrySet()) {
> -            arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
> -          }
> -          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
> -          setField(persistent, field, arr);
> -          break;
> -        default:
> -          byte[] val;
> -          if (col.isSuperColumn()) {
> -            val = result.get(col.family, col.superColumn, col.column);
> -          } else {
> -            val = result.get(col.family, col.column);
> -          }
> -          if (val == null) {
> -            continue;
> -          }
> -          setField(persistent, field, val);
> -          break;
> +  private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
> +      CassandraResultSet cassandraResultSet) {
> +    // select family columns that are included in the query
> +    List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
> +
> +    for (Row<String, String, String> row : rows) {
> +      String key = row.getKey();
> +
> +      // find associated row in the resultset
> +      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
> +      if (cassandraRow == null) {
> +        cassandraRow = new CassandraRow();
> +        cassandraResultSet.putRow(key, cassandraRow);
> +        cassandraRow.setKey(key);
>       }
> +
> +      ColumnSlice<String, String> columnSlice = row.getColumnSlice();
> +
> +      for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
> +        CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
> +        cassandraSubColumn.setValue(hColumn);
> +        cassandraSubColumn.setFamily(family);
> +        cassandraRow.add(cassandraSubColumn);
> +      }
> +
>     }
> -    stateManager.clearDirty(persistent);
> -    return persistent;
>   }
>
> -  @Override
> -  public void put(K key, T obj) throws IOException {
> -    Mutate mutate = new Mutate();
> -    Schema schema = obj.getSchema();
> -    StateManager stateManager = obj.getStateManager();
> -    List<Field> fields = schema.getFields();
> -    String qual;
> -    byte[] value;
> -    for (int i = 0; i < fields.size(); i++) {
> -      if (!stateManager.isDirty(obj, i)) {
> -        continue;
> +  private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
> +      CassandraResultSet cassandraResultSet) {
> +
> +    List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
> +    for (SuperRow<String, String, String, String> superRow: superRows) {
> +      String key = superRow.getKey();
> +      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
> +      if (cassandraRow == null) {
> +        cassandraRow = new CassandraRow();
> +        cassandraResultSet.putRow(key, cassandraRow);
> +        cassandraRow.setKey(key);
>       }
> -      Field field = fields.get(i);
> -      Type type = field.schema().getType();
> -      Object o = obj.get(i);
> -      CassandraColumn col = columnMap.get(field.name());
> -
> -      switch(type) {
> -      case MAP:
> -        if(o instanceof StatefulMap) {
> -          @SuppressWarnings("unchecked")
> -          StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
> -          for (Entry<Utf8, State> e : map.states().entrySet()) {
> -            Utf8 mapKey = e.getKey();
> -            switch (e.getValue()) {
> -            case DIRTY:
> -              qual = mapKey.toString();
> -              value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
> -              if (col.isSuperColumn()) {
> -                mutate.put(col.family, col.superColumn, qual, value);
> -              } else {
> -                mutate.put(col.family, qual, value);
> -              }
> -              break;
> -            case DELETED:
> -              qual = mapKey.toString();
> -              if (col.isSuperColumn()) {
> -                mutate.delete(col.family, col.superColumn, qual);
> -              } else {
> -                mutate.delete(col.family, qual);
> -              }
> -              break;
> -            }
> -          }
> -        } else {
> -          @SuppressWarnings({ "rawtypes", "unchecked" })
> -          Set<Map.Entry> set = ((Map)o).entrySet();
> -          for(@SuppressWarnings("rawtypes") Entry entry: set) {
> -            qual = entry.getKey().toString();
> -            value = ByteUtils.toBytes(entry.getValue().toString());
> -            if (col.isSuperColumn()) {
> -              mutate.put(col.family, col.superColumn, qual, value);
> -            } else {
> -              mutate.put(col.family, qual, value);
> -            }
> -          }
> -        }
> -        break;
> -      case ARRAY:
> -        if(o instanceof GenericArray) {
> -          @SuppressWarnings("rawtypes")
> -          GenericArray arr = (GenericArray) o;
> -          int j=0;
> -          for(Object item : arr) {
> -            value = ByteUtils.toBytes(item.toString());
> -            if (col.isSuperColumn()) {
> -              mutate.put(col.family, col.superColumn, Integer.toString(j), value);
> -            } else {
> -              mutate.put(col.family, Integer.toString(j), value);
> -            }
> -            j++;
> -          }
> -        }
> -        break;
> -      default:
> -        value = ByteUtils.toBytes(o, field.schema(), datumWriter);
> -        if (col.isSuperColumn()) {
> -          mutate.put(col.family, col.superColumn, col.column, value);
> -        } else {
> -          mutate.put(col.family, col.column, value);
> -        }
> -        break;
> +
> +      SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
> +      for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
> +        CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
> +        cassandraSuperColumn.setValue(hSuperColumn);
> +        cassandraSuperColumn.setFamily(family);
> +        cassandraRow.add(cassandraSuperColumn);
>       }
>     }
> -
> -    if(!mutate.isEmpty())
> -      client.mutate(key.toString(), mutate);
>   }
>
> -  @Override
> -  public boolean delete(K key) throws IOException {
> -    Mutate mutate = new Mutate();
> -    for (String family : mapping.getColumnFamilies()) {
> -      mutate.deleteAll(family);
> +  /**
> +   * Flush the buffer. Write the buffered rows.
> +   * @see org.apache.gora.store.DataStore#flush()
> +   */
> +  @Override
> +  public void flush() throws IOException {
> +    for (K key: this.buffer.keySet()) {
> +      T value = this.buffer.get(key);
> +      Schema schema = value.getSchema();
> +      for (Field field: schema.getFields()) {
> +        if (value.isDirty(field.pos())) {
> +          addOrUpdateField((String) key, field, value.get(field.pos()));
> +        }
> +      }
>     }
> -
> -    client.mutate(key.toString(), mutate);
> -    return true;
> +
> +    this.buffer.clear();
>   }
>
>   @Override
> -  public void flush() throws IOException { }
> -
> -  @Override
> -  public void close() throws IOException {
> -    client.close();
> +  public T get(K key, String[] fields) throws IOException {
> +    LOG.info("get " + key);
> +    return null;
>   }
>
>   @Override
> -  public Query<K, T> newQuery() {
> -    return new CassandraQuery<K, T>(this);
> +  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
> +      throws IOException {
> +    // just a single partition
> +    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
> +    partitions.add(new PartitionQueryImpl<K,T>(query));
> +    return partitions;
>   }
>
>   @Override
> -  public long deleteByQuery(Query<K, T> query) throws IOException {
> -    // TODO Auto-generated method stub
> -    return 0;
> +  public String getSchemaName() {
> +    LOG.info("get schema name");
> +    return null;
>   }
>
>   @Override
> -  public Result<K, T> execute(Query<K, T> query) throws IOException {
> -    return new CassandraResult<K, T>(this, query, BATCH_COUNT);
> +  public Query<K, T> newQuery() {
> +    return new CassandraQuery<K, T>(this);
>   }
>
> -  @Override
> -  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
> -  throws IOException {
> -    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
> -
> -    List<TokenRange> rangeList = client.describeRing();
> -    for (TokenRange range : rangeList) {
> -      List<String> tokens =
> -        client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
> -      // turn the sub-ranges into InputSplits
> -      String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
> -      // hadoop needs hostname, not ip
> -      for (int i = 0; i < endpoints.length; i++) {
> -          endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
> -      }
> -
> -      for (int i = 1; i < tokens.size(); i++) {
> -        CassandraPartitionQuery<K, T> partitionQuery =
> -          new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
> -        partitions.add(partitionQuery);
> +  /**
> +   * Duplicate instance to keep all the objects in memory till flushing.
> +   * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
> +   */
> +  @Override
> +  public void put(K key, T value) throws IOException {
> +    T p = (T) value.newInstance(new StateManagerImpl());
> +    Schema schema = value.getSchema();
> +    for (Field field: schema.getFields()) {
> +      if (value.isDirty(field.pos())) {
> +        Object fieldValue = value.get(field.pos());
> +
> +        // check if field has a nested structure (map or record)
> +        Schema fieldSchema = field.schema();
> +        Type type = fieldSchema.getType();
> +        switch(type) {
> +          case RECORD:
> +            Persistent persistent = (Persistent) fieldValue;
> +            Persistent newRecord = persistent.newInstance(new StateManagerImpl());
> +            for (Field member: fieldSchema.getFields()) {
> +              newRecord.put(member.pos(), persistent.get(member.pos()));
> +            }
> +            fieldValue = newRecord;
> +            break;
> +          case MAP:
> +            StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
> +            StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
> +            fieldValue = newMap;
> +            break;
> +        }
> +
> +        p.put(field.pos(), fieldValue);
>       }
>     }
> -    return partitions;
> -  }
> -
> -  private CassandraClient createClient() throws IOException {
> -    String serverStr =
> -      DataStoreFactory.findPropertyOrDie(properties, this, "servers");
> -    String[] server1Parts = serverStr.split(",")[0].split(":");
> -    try {
> -      return new SimpleCassandraClient(server1Parts[0],
> -          Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
> -    } catch (Exception e) {
> -      throw new IOException(e);
> -    }
> -  }
> -
> -  @SuppressWarnings("unchecked")
> -  protected void readMapping(String filename) throws IOException {
> -
> -    mapping = new CassandraMapping();
> -    columnMap = new HashMap<String, CassandraColumn>();
> -
> -    try {
> -      SAXBuilder builder = new SAXBuilder();
> -      Document doc = builder.build(getClass().getClassLoader()
> -          .getResourceAsStream(filename));
> -
> -      List<Element> classes = doc.getRootElement().getChildren("class");
> -
> -      for(Element classElement: classes) {
> -        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
> -            && classElement.getAttributeValue("name").equals(
> -                persistentClass.getCanonicalName())) {
> -
> -          String keySpace = classElement.getAttributeValue("keyspace");
> -          mapping.setKeySpace(keySpace);
> -          client = createClient();
> -          Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
> -          for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
> -            boolean isSuper = e.getValue().get("Type").equals("Super");
> -            mapping.addColumnFamily(e.getKey(), isSuper);
> -          }
> -
> -          List<Element> fields = classElement.getChildren("field");
> -
> -          for(Element field:fields) {
> -            String fieldName = field.getAttributeValue("name");
> -            String path = field.getAttributeValue("path");
> -            String[] parts = path.split(":");
> -            String columnFamily = parts[0];
> -            String superColumn = null;
> -            String column = null;
> -
> -            boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
> -            if (isSuper) {
> -              superColumn = parts[1];
> -              if (parts.length == 3) {
> -                column = parts[2];
> +
> +    this.buffer.put(key, p);
> + }
> +
> +  /**
> +   * Add a field to Cassandra according to its type.
> +   * @param key     the key of the row where the field should be added
> +   * @param field   the Avro field representing a datum
> +   * @param value   the field value
> +   */
> +  private void addOrUpdateField(String key, Field field, Object value) {
> +    Schema schema = field.schema();
> +    Type type = schema.getType();
> +    //LOG.info(field.name() + " " + type.name());
> +    switch (type) {
> +      case STRING:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case INT:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case LONG:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case BYTES:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case FLOAT:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case RECORD:
> +        if (value != null) {
> +          if (value instanceof PersistentBase) {
> +            PersistentBase persistentBase = (PersistentBase) value;
> +            for (Field member: schema.getFields()) {
> +
> +              // TODO: hack, do not store empty arrays
> +              Object memberValue = persistentBase.get(member.pos());
> +              if (memberValue instanceof GenericArray<?>) {
> +                GenericArray<String> array = (GenericArray<String>) memberValue;
> +                if (array.size() == 0) {
> +                  continue;
> +                }
>               }
> -            } else {
> -              if (parts.length == 2) {
> -                column = parts[1];
> +
> +              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
> +            }
> +          } else {
> +            LOG.info("Record not supported: " + value.toString());
> +
> +          }
> +        }
> +        break;
> +      case MAP:
> +        if (value != null) {
> +          if (value instanceof StatefulHashMap<?, ?>) {
> +            //TODO cast to stateful map and only write dirty keys
> +            Map<Utf8, Object> map = (Map<Utf8, Object>) value;
> +            for (Utf8 mapKey: map.keySet()) {
> +
> +              // TODO: hack, do not store empty arrays
> +              Object keyValue = map.get(mapKey);
> +              if (keyValue instanceof GenericArray<?>) {
> +                GenericArray<String> array = (GenericArray<String>) keyValue;
> +                if (array.size() == 0) {
> +                  continue;
> +                }
>               }
> +
> +              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
>             }
> -
> -            columnMap.put(fieldName,
> -                new CassandraColumn(columnFamily, superColumn, column));
> +          } else {
> +            LOG.info("Map not supported: " + value.toString());
>           }
> -
> -          break;
>         }
> -      }
> -    } catch(Exception ex) {
> -      throw new IOException(ex);
> +        break;
> +      default:
> +        LOG.info("Type not considered: " + type.name());
>     }
>   }
> -}
> \ No newline at end of file
> +
> +  @Override
> +  public boolean schemaExists() throws IOException {
> +    LOG.info("schema exists");
> +    return false;
> +  }
> +
> +}
>
> Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
> +++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
> @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
>  public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
>   public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
>
> -  private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
> -  private static final int BUFFER_LIMIT_READ_VALUE = 10000;
> +  public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
> +  public static final int BUFFER_LIMIT_READ_VALUE = 10000;
>
>   protected Query<K,T> query;
>   protected Result<K,T> result;
>
>
>

Re: svn commit: r1149420 - in /incubator/gora/trunk: gora-cassandra/ivy/ gora-cassandra/lib-ext/ gora-cassandra/src/main/java/org/apache/gora/cassandra/client/ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java

Posted by Alexis <al...@gmail.com>.
My bad. The headers should now be included in the new revision 1149425.

On Thu, Jul 21, 2011 at 5:39 PM, Henry Saputra <he...@gmail.com> wrote:
> Looks like some files missing Apache license header.
>
> - Henry
>
> On Thu, Jul 21, 2011 at 5:34 PM,  <al...@apache.org> wrote:
>> Author: alexis
>> Date: Fri Jul 22 00:33:59 2011
>> New Revision: 1149420
>>
>> URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
>> Log:
>> Cassandra 0.8 backend witch Hector client
>>
>> Added:
>>    incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar   (with props)
>>    incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar   (with props)
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
>> Removed:
>>    incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
>>    incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
>> Modified:
>>    incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
>>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
>>    incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>>
>> Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
>> +++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
>> @@ -22,7 +22,7 @@
>>       organisation="org.apache.gora"
>>       module="gora-cassandra"
>>       status="integration"/>
>> -
>> +
>>   <configurations>
>>     <include file="${project.dir}/ivy/ivy-configurations.xml"/>
>>   </configurations>
>> @@ -32,15 +32,25 @@
>>     <artifact name="gora-cassandra-test" conf="test"/>
>>   </publications>
>>
>> +
>>   <dependencies>
>>     <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
>> -    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
>> -    <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
>> -
>> -    <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
>> -    <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
>> -
>> -    <dependency org="com.google.guava" name="guava" rev="r06"/>
>> +
>> +    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
>> +
>> +    <dependency org="org.jdom" name="jdom" rev="1.1">
>> +       <exclude org="xerces" name="xercesImpl"/>
>> +    </dependency>
>> +
>> +    <!--
>> +        <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
>> +       <dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
>> +    -->
>> +    <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
>> +    <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
>> +    <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
>> +    <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
>> +    <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
>>
>>     <!-- test dependencies -->
>>
>>
>> Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
>> ==============================================================================
>> Binary file - no diff available.
>>
>> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
>> ------------------------------------------------------------------------------
>>    svn:mime-type = application/octet-stream
>>
>> Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
>> ==============================================================================
>> Binary file - no diff available.
>>
>> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
>> ------------------------------------------------------------------------------
>>    svn:mime-type = application/octet-stream
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,41 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import org.apache.avro.Schema.Field;
>> +
>> +
>> +/**
>> + * Represents a unit of data: a key value pair tagged by a family name
>> + */
>> +public abstract class CassandraColumn {
>> +  public static final int SUB = 0;
>> +  public static final int SUPER = 1;
>> +
>> +  private String family;
>> +  private int type;
>> +  private Field field;
>> +
>> +  public String getFamily() {
>> +    return family;
>> +  }
>> +  public void setFamily(String family) {
>> +    this.family = family;
>> +  }
>> +  public int getType() {
>> +    return type;
>> +  }
>> +  public void setType(int type) {
>> +    this.type = type;
>> +  }
>> +  public void setField(Field field) {
>> +    this.field = field;
>> +  }
>> +
>> +  protected Field getField() {
>> +    return this.field;
>> +  }
>> +
>> +  public abstract String getName();
>> +  public abstract Object getValue();
>> +
>> +
>> +}
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
>> @@ -1,93 +0,0 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - *     http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>> -package org.apache.gora.cassandra.query;
>> -
>> -import java.io.DataInput;
>> -import java.io.DataOutput;
>> -import java.io.IOException;
>> -
>> -import org.apache.gora.persistency.Persistent;
>> -import org.apache.gora.query.Query;
>> -import org.apache.gora.query.impl.PartitionQueryImpl;
>> -import org.apache.hadoop.io.Text;
>> -
>> -public class CassandraPartitionQuery<K, T extends Persistent>
>> -extends PartitionQueryImpl<K, T> {
>> -
>> -  private String startToken;
>> -
>> -  private String endToken;
>> -
>> -  private String[] endPoints;
>> -
>> -  private int splitSize;
>> -
>> -  public CassandraPartitionQuery() {
>> -    this.dataStore = null;
>> -  }
>> -
>> -  public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
>> -      int splitSize) {
>> -    super(baseQuery);
>> -    this.startToken = startToken;
>> -    this.endToken = endToken;
>> -    this.endPoints = endPoints;
>> -    this.splitSize = splitSize;
>> -  }
>> -
>> -  @Override
>> -  public void write(DataOutput out) throws IOException {
>> -    super.write(out);
>> -    Text.writeString(out, startToken);
>> -    Text.writeString(out, endToken);
>> -    out.writeInt(endPoints.length);
>> -    for (String endPoint : endPoints) {
>> -      Text.writeString(out, endPoint);
>> -    }
>> -    out.writeInt(splitSize);
>> -  }
>> -
>> -  @Override
>> -  public void readFields(DataInput in) throws IOException {
>> -    super.readFields(in);
>> -    startToken = Text.readString(in);
>> -    endToken = Text.readString(in);
>> -    int size = in.readInt();
>> -    endPoints = new String[size];
>> -    for (int i = 0; i < size; i++) {
>> -      endPoints[i] = Text.readString(in);
>> -    }
>> -    splitSize = in.readInt();
>> -  }
>> -
>> -  public String getStartToken() {
>> -    return startToken;
>> -  }
>> -
>> -  public String getEndToken() {
>> -    return endToken;
>> -  }
>> -
>> -  public String[] getEndPoints() {
>> -    return endPoints;
>> -  }
>> -
>> -  public int getSplitSize() {
>> -    return splitSize;
>> -  }
>> -}
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
>> @@ -1,34 +1,55 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - *     http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>>  package org.apache.gora.cassandra.query;
>>
>> +import java.util.List;
>> +import java.util.Map;
>> +
>>  import org.apache.gora.persistency.Persistent;
>> +import org.apache.gora.query.Query;
>>  import org.apache.gora.query.impl.QueryBase;
>>  import org.apache.gora.store.DataStore;
>>
>> -public class CassandraQuery<K, T extends Persistent>
>> -extends QueryBase<K, T> {
>> +public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
>>
>> +  private Query<K, T> query;
>> +
>> +  /**
>> +   * Maps Avro fields to Cassandra columns.
>> +   */
>> +  private Map<String, List<String>> familyMap;
>> +
>>   public CassandraQuery() {
>>     super(null);
>>   }
>> -
>>   public CassandraQuery(DataStore<K, T> dataStore) {
>>     super(dataStore);
>>   }
>> +  public void setFamilyMap(Map<String, List<String>> familyMap) {
>> +    this.familyMap = familyMap;
>> +  }
>> +  public Map<String, List<String>> getFamilyMap() {
>> +    return familyMap;
>> +  }
>> +
>> +  /**
>> +   * @param family the family name
>> +   * @return an array of the query column names belonging to the family
>> +   */
>> +  public String[] getColumns(String family) {
>> +
>> +    List<String> columnList = familyMap.get(family);
>> +    String[] columns = new String[columnList.size()];
>> +    for (int i = 0; i < columns.length; ++i) {
>> +      columns[i] = columnList.get(i);
>> +    }
>> +    return columns;
>> +  }
>> +  public Query<K, T> getQuery() {
>> +    return query;
>> +  }
>> +  public void setQuery(Query<K, T> query) {
>> +    this.query = query;
>> +  }
>> +
>> +
>> +
>>  }
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
>> @@ -1,157 +1,97 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - *     http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>>  package org.apache.gora.cassandra.query;
>>
>>  import java.io.IOException;
>> -import java.net.InetAddress;
>> -import java.net.UnknownHostException;
>> -import java.util.Iterator;
>> -
>> -import org.apache.gora.cassandra.client.CassandraClient;
>> -import org.apache.gora.cassandra.client.Row;
>> -import org.apache.gora.cassandra.client.Select;
>> -import org.apache.gora.cassandra.store.CassandraStore;
>> +import java.util.List;
>> +import java.util.Map;
>> +
>> +import org.apache.avro.Schema;
>> +import org.apache.avro.Schema.Field;
>>  import org.apache.gora.persistency.Persistent;
>>  import org.apache.gora.query.Query;
>>  import org.apache.gora.query.impl.ResultBase;
>>  import org.apache.gora.store.DataStore;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>> -public class CassandraResult<K, T extends Persistent>
>> -extends ResultBase<K, T> {
>> -
>> -  private Iterator<Row> rowIter;
>> -
>> -  private CassandraStore<K, T> store;
>> +public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
>> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
>> +
>> +  private int rowNumber;
>> +
>> +  private CassandraResultSet cassandraResultSet;
>> +
>> +  /**
>> +   * Maps Cassandra columns to Avro fields.
>> +   */
>> +  private Map<String, String> reverseMap;
>>
>> -  private String[] fields;
>> -
>> -  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
>> -      int batchRowCount) throws IOException {
>> +  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
>>     super(dataStore, query);
>> -
>> -    store = (CassandraStore<K, T>) dataStore;
>> -    fields = query.getFields();
>> -
>> -    boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
>> -    String startTokenOrKey;
>> -    String endTokenOrKey;
>> -
>> -    if (isUsingTokens) {
>> -      CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
>> -      startTokenOrKey = partitionQuery.getStartToken();
>> -      endTokenOrKey = partitionQuery.getEndToken();
>> -    } else {
>> -      CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
>> -      startTokenOrKey = cassandraQuery.getStartKey().toString();
>> -      endTokenOrKey = cassandraQuery.getEndKey().toString();
>> -    }
>> -
>> -    Select select = store.createSelect(fields);
>> -
>> -    CassandraClient client = store.getClientByLocation(getLocation(query));
>> -    if (isUsingTokens) {
>> -      rowIter =
>> -        client.getTokenRange(startTokenOrKey, endTokenOrKey,
>> -            batchRowCount, select).iterator();
>> -    } else {
>> -      rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
>> -          batchRowCount, select).iterator();
>> -    }
>> -  }
>> -
>> -  @Override
>> -  public float getProgress() throws IOException {
>> -    return 0;
>>   }
>>
>>   @Override
>>   protected boolean nextInner() throws IOException {
>> -    if (!rowIter.hasNext()) {
>> -      return false;
>> -    }
>> -    Row row = rowIter.next();
>> -    if (row == null) {
>> -      return false;
>> +    if (this.rowNumber < this.cassandraResultSet.size()) {
>> +      updatePersistent();
>>     }
>> -
>> -    key = toKey(row.getKey());
>> -    persistent = store.newInstance(row, fields);
>> -
>> -    return true;
>> +    ++this.rowNumber;
>> +    return (this.rowNumber <= this.cassandraResultSet.size());
>>   }
>>
>> +
>> +  /**
>> +   * Load key/value pair from Cassandra row to Avro record.
>> +   * @throws IOException
>> +   */
>>   @SuppressWarnings("unchecked")
>> -  private K toKey(String keyStr) {
>> -    Class<K> keyClass = dataStore.getKeyClass();
>> -    if (keyClass.isAssignableFrom(String.class)) {
>> -      return (K) keyStr;
>> -    }
>> -    if (keyClass.isAssignableFrom(Integer.class)) {
>> -      return (K) (Integer) Integer.parseInt(keyStr);
>> -    }
>> -    if (keyClass.isAssignableFrom(Float.class)) {
>> -      return (K) (Float) Float.parseFloat(keyStr);
>> -    }
>> -    if (keyClass.isAssignableFrom(Double.class)) {
>> -      return (K) (Double) Double.parseDouble(keyStr);
>> -    }
>> -    if (keyClass.isAssignableFrom(Long.class)) {
>> -      return (K) (Long) Long.parseLong(keyStr);
>> -    }
>> -    if (keyClass.isAssignableFrom(Short.class)) {
>> -      return (K) (Short) Short.parseShort(keyStr);
>> -    }
>> -    if (keyClass.isAssignableFrom(Byte.class)) {
>> -      return (K) (Byte) Byte.parseByte(keyStr);
>> +  private void updatePersistent() throws IOException {
>> +    CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
>> +
>> +    // load key
>> +    this.key = (K) cassandraRow.getKey();
>> +
>> +    // load value
>> +    Schema schema = this.persistent.getSchema();
>> +    List<Field> fields = schema.getFields();
>> +
>> +    for (CassandraColumn cassandraColumn: cassandraRow) {
>> +
>> +      // get field name
>> +      String family = cassandraColumn.getFamily();
>> +      String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
>> +
>> +      // get field
>> +      int pos = this.persistent.getFieldIndex(fieldName);
>> +      Field field = fields.get(pos);
>> +
>> +      // get value
>> +      cassandraColumn.setField(field);
>> +      Object value = cassandraColumn.getValue();
>> +
>> +      this.persistent.put(pos, value);
>> +      // this field does not need to be written back to the store
>> +      this.persistent.clearDirty(pos);
>>     }
>>
>> -    throw new RuntimeException("Can't parse " + keyStr +
>> -                               " as an instance of " + keyClass);
>>   }
>>
>>   @Override
>> -  public void close() throws IOException { }
>> +  public void close() throws IOException {
>> +    // TODO Auto-generated method stub
>> +
>> +  }
>>
>> -  private String getLocation(Query<K, T> query) {
>> -    if (!(query instanceof CassandraPartitionQuery)) {
>> -      return null;
>> -    }
>> -    CassandraPartitionQuery<K, T> partitonQuery =
>> -      (CassandraPartitionQuery<K, T>) query;
>> -    InetAddress[] localAddresses = new InetAddress[0];
>> -    try {
>> -      localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
>> -    } catch (UnknownHostException e) {
>> -      throw new AssertionError(e);
>> -    }
>> -    for (InetAddress address : localAddresses) {
>> -      for (String location : partitonQuery.getEndPoints()) {
>> -        InetAddress locationAddress = null;
>> -        try {
>> -          locationAddress = InetAddress.getByName(location);
>> -        } catch (UnknownHostException e) {
>> -          throw new AssertionError(e);
>> -        }
>> -        if (address.equals(locationAddress)) {
>> -          return location;
>> -        }
>> -      }
>> -    }
>> -    return partitonQuery.getEndPoints()[0];
>> +  @Override
>> +  public float getProgress() throws IOException {
>> +    return (((float) this.rowNumber) / this.cassandraResultSet.size());
>>   }
>> -}
>> \ No newline at end of file
>> +
>> +  public void setResultSet(CassandraResultSet cassandraResultSet) {
>> +    this.cassandraResultSet = cassandraResultSet;
>> +  }
>> +
>> +  public void setReverseMap(Map<String, String> reverseMap) {
>> +    this.reverseMap = reverseMap;
>> +  }
>> +
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,36 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.util.ArrayList;
>> +import java.util.HashMap;
>> +
>> +/**
>> + * List data structure to keep the order coming from the Cassandra selects.
>> + */
>> +public class CassandraResultSet extends ArrayList<CassandraRow> {
>> +
>> +  /**
>> +   *
>> +   */
>> +  private static final long serialVersionUID = -7620939600192859652L;
>> +
>> +  /**
>> +   * Maps keys to indices in the list.
>> +   */
>> +  private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
>> +
>> +  public CassandraRow getRow(String key) {
>> +    Integer integer = this.indexMap.get(key);
>> +    if (integer == null) {
>> +      return null;
>> +    }
>> +
>> +    return this.get(integer);
>> +  }
>> +
>> +  public void putRow(String key, CassandraRow cassandraRow) {
>> +    this.add(cassandraRow);
>> +    this.indexMap.put(key, this.size()-1);
>> +  }
>> +
>> +
>> +}
>> \ No newline at end of file
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,24 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.util.ArrayList;
>> +
>> +/**
>> + * List of key value pairs representing a row, tagged by a key.
>> + */
>> +public class CassandraRow extends ArrayList<CassandraColumn> {
>> +
>> +  /**
>> +   *
>> +   */
>> +  private static final long serialVersionUID = -7620939600192859652L;
>> +  private String key;
>> +
>> +  public String getKey() {
>> +    return this.key;
>> +  }
>> +
>> +  public void setKey(String key) {
>> +    this.key = key;
>> +  }
>> +
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,104 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.nio.ByteBuffer;
>> +import java.nio.CharBuffer;
>> +import java.nio.charset.CharacterCodingException;
>> +import java.nio.charset.Charset;
>> +import java.nio.charset.CharsetEncoder;
>> +
>> +import me.prettyprint.hector.api.beans.HColumn;
>> +
>> +import org.apache.avro.Schema;
>> +import org.apache.avro.Schema.Field;
>> +import org.apache.avro.Schema.Type;
>> +import org.apache.avro.generic.GenericArray;
>> +import org.apache.avro.generic.GenericData;
>> +import org.apache.avro.util.Utf8;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +public class CassandraSubColumn extends CassandraColumn {
>> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
>> +
>> +  private static final String ENCODING = "UTF-8";
>> +
>> +  private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
>> +
>> +
>> +  /**
>> +   * Key-value pair containing the raw data.
>> +   */
>> +  private HColumn<String, String> hColumn;
>> +
>> +  public String getName() {
>> +    return hColumn.getName();
>> +  }
>> +
>> +  /**
>> +   * Deserialize a String into an typed Object, according to the field schema.
>> +   * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
>> +   */
>> +  public Object getValue() {
>> +    Field field = getField();
>> +    Schema fieldSchema = field.schema();
>> +    Type type = fieldSchema.getType();
>> +    String valueString = hColumn.getValue();
>> +    Object value = null;
>> +
>> +    switch (type) {
>> +      case STRING:
>> +        value = new Utf8(valueString);
>> +        break;
>> +      case BYTES:
>> +        // convert string to bytebuffer
>> +        value = getByteBuffer(valueString);
>> +        break;
>> +      case INT:
>> +        value = Integer.parseInt(valueString);
>> +        break;
>> +      case LONG:
>> +        value = Long.parseLong(valueString);
>> +        break;
>> +      case FLOAT:
>> +        value = Float.parseFloat(valueString);
>> +        break;
>> +      case ARRAY:
>> +        // convert string to array
>> +        valueString = valueString.substring(1, valueString.length()-1);
>> +        String[] elements = valueString.split(", ");
>> +
>> +        Type elementType = fieldSchema.getElementType().getType();
>> +        if (elementType == Schema.Type.STRING) {
>> +          // the array type is String
>> +          GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
>> +          for (String element: elements) {
>> +            genericArray.add(element);
>> +          }
>> +
>> +          value = genericArray;
>> +        } else {
>> +          LOG.info("Element type not supported: " + elementType);
>> +        }
>> +        break;
>> +      default:
>> +        LOG.info("Type not supported: " + type);
>> +    }
>> +
>> +    return value;
>> +
>> +  }
>> +
>> +  public void setValue(HColumn<String, String> hColumn) {
>> +    this.hColumn = hColumn;
>> +  }
>> +
>> +  public static ByteBuffer getByteBuffer(String valueString) {
>> +    ByteBuffer byteBuffer = null;
>> +    try {
>> +      byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
>> +    } catch (CharacterCodingException cce) {
>> +      LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
>> +    }
>> +    return byteBuffer;
>> +  }
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,102 @@
>> +package org.apache.gora.cassandra.query;
>> +
>> +import java.util.Map;
>> +
>> +import me.prettyprint.hector.api.beans.HColumn;
>> +import me.prettyprint.hector.api.beans.HSuperColumn;
>> +
>> +import org.apache.avro.Schema;
>> +import org.apache.avro.Schema.Field;
>> +import org.apache.avro.Schema.Type;
>> +import org.apache.avro.util.Utf8;
>> +import org.apache.gora.persistency.StatefulHashMap;
>> +import org.apache.gora.persistency.impl.PersistentBase;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +public class CassandraSuperColumn extends CassandraColumn {
>> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
>> +
>> +  private HSuperColumn<String, String, String> hSuperColumn;
>> +
>> +  public String getName() {
>> +    return hSuperColumn.getName();
>> +  }
>> +
>> +  public Object getValue() {
>> +    Field field = getField();
>> +    Schema fieldSchema = field.schema();
>> +    Type type = fieldSchema.getType();
>> +
>> +    Object value = null;
>> +
>> +    switch (type) {
>> +      case MAP:
>> +        Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
>> +        Type valueType = fieldSchema.getValueType().getType();
>> +
>> +        for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
>> +          String memberString = hColumn.getValue();
>> +          Object memberValue = null;
>> +          switch (valueType) {
>> +            case STRING:
>> +              memberValue = new Utf8(memberString);
>> +              break;
>> +            case BYTES:
>> +              memberValue = CassandraSubColumn.getByteBuffer(memberString);
>> +              break;
>> +            default:
>> +              LOG.info("Type for the map value is not supported: " + valueType);
>> +
>> +          }
>> +          map.put(new Utf8(hColumn.getName()), memberValue);
>> +        }
>> +        value = map;
>> +
>> +        break;
>> +      case RECORD:
>> +        String fullName = fieldSchema.getFullName();
>> +
>> +        Class<?> claz = null;
>> +        try {
>> +          claz = Class.forName(fullName);
>> +        } catch (ClassNotFoundException cnfe) {
>> +          LOG.warn("Unable to load class " + fullName, cnfe);
>> +          break;
>> +        }
>> +
>> +        try {
>> +          value = claz.newInstance();
>> +        } catch (InstantiationException ie) {
>> +          LOG.warn("Instantiation error", ie);
>> +          break;
>> +        } catch (IllegalAccessException iae) {
>> +          LOG.warn("Illegal access error", iae);
>> +          break;
>> +        }
>> +
>> +        // we updated the value instance, now update its members
>> +        if (value instanceof PersistentBase) {
>> +          PersistentBase record = (PersistentBase) value;
>> +
>> +          for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
>> +            Field memberField = fieldSchema.getField(hColumn.getName());
>> +            CassandraSubColumn cassandraColumn = new CassandraSubColumn();
>> +            cassandraColumn.setField(memberField);
>> +            cassandraColumn.setValue(hColumn);
>> +            record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
>> +          }
>> +        }
>> +        break;
>> +      default:
>> +        LOG.info("Type not supported: " + type);
>> +    }
>> +
>> +    return value;
>> +  }
>> +
>> +  public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
>> +    this.hSuperColumn = hSuperColumn;
>> +  }
>> +
>> +}
>>
>> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
>> @@ -0,0 +1,253 @@
>> +package org.apache.gora.cassandra.store;
>> +
>> +import java.nio.ByteBuffer;
>> +import java.util.ArrayList;
>> +import java.util.Arrays;
>> +import java.util.HashMap;
>> +import java.util.List;
>> +import java.util.Map;
>> +
>> +import me.prettyprint.cassandra.serializers.StringSerializer;
>> +import me.prettyprint.cassandra.service.CassandraHostConfigurator;
>> +import me.prettyprint.hector.api.Cluster;
>> +import me.prettyprint.hector.api.Keyspace;
>> +import me.prettyprint.hector.api.Serializer;
>> +import me.prettyprint.hector.api.beans.OrderedRows;
>> +import me.prettyprint.hector.api.beans.OrderedSuperRows;
>> +import me.prettyprint.hector.api.beans.Row;
>> +import me.prettyprint.hector.api.beans.SuperRow;
>> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
>> +import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
>> +import me.prettyprint.hector.api.factory.HFactory;
>> +import me.prettyprint.hector.api.mutation.Mutator;
>> +import me.prettyprint.hector.api.query.QueryResult;
>> +import me.prettyprint.hector.api.query.RangeSlicesQuery;
>> +import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
>> +
>> +import org.apache.gora.cassandra.query.CassandraQuery;
>> +import org.apache.gora.mapreduce.GoraRecordReader;
>> +import org.apache.gora.persistency.Persistent;
>> +import org.apache.gora.query.Query;
>> +import org.apache.gora.util.ByteUtils;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>> +
>> +public class CassandraClient<K, T extends Persistent> {
>> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
>> +
>> +  private Cluster cluster;
>> +  private Keyspace keyspace;
>> +  private Mutator<String> mutator;
>> +
>> +  private CassandraMapping cassandraMapping = new CassandraMapping();
>> +
>> +  private Serializer<String> stringSerializer = new StringSerializer();
>> +
>> +  public void init() throws Exception {
>> +    this.cassandraMapping.loadConfiguration();
>> +    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
>> +
>> +    // add keyspace to cluster
>> +    checkKeyspace();
>> +
>> +    // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
>> +    this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
>> +
>> +    this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
>> +  }
>> +
>> +  /**
>> +   * Check if keyspace already exists. If not, create it.
>> +   */
>> +  public void checkKeyspace() {
>> +    // "describe keyspace <keyspaceName>;" query
>> +    KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
>> +    if (keyspaceDefinition == null) {
>> +      List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
>> +      keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
>> +      this.cluster.addKeyspace(keyspaceDefinition);
>> +      LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
>> +
>> +      keyspaceDefinition = null;
>> +    }
>> +
>> +
>> +  }
>> +
>> +  /**
>> +   * Drop keyspace.
>> +   */
>> +  public void dropKeyspace() {
>> +    // "drop keyspace <keyspaceName>;" query
>> +    this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
>> +  }
>> +
>> +  /**
>> +   * Insert a field in a column.
>> +   * @param key the row key
>> +   * @param fieldName the field name
>> +   * @param value the field value.
>> +   */
>> +  public void addColumn(String key, String fieldName, Object value) {
>> +    if (value == null) {
>> +      return;
>> +    }
>> +    if (value instanceof ByteBuffer) {
>> +      value = toString((ByteBuffer) value);
>> +    }
>> +
>> +    String columnFamily = this.cassandraMapping.getFamily(fieldName);
>> +    String columnName = this.cassandraMapping.getColumn(fieldName);
>> +
>> +    this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
>> +  }
>> +
>> +  /**
>> +   * TODO do no convert bytes to string to store a binary field
>> +   * @param value
>> +   * @return
>> +   */
>> +  private static String toString(ByteBuffer value) {
>> +    ByteBuffer byteBuffer = (ByteBuffer) value;
>> +    return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
>> +  }
>> +
>> +  /**
>> +   * Insert a member in a super column. This is used for map and record Avro types.
>> +   * @param key the row key
>> +   * @param fieldName the field name
>> +   * @param memberName the member name
>> +   * @param value the member value
>> +   */
>> +  public void addSubColumn(String key, String fieldName, String memberName, Object value) {
>> +    if (value == null) {
>> +      return;
>> +    }
>> +
>> +    if (value instanceof ByteBuffer) {
>> +      value = toString((ByteBuffer) value);
>> +    }
>> +
>> +    String columnFamily = this.cassandraMapping.getFamily(fieldName);
>> +    String superColumnName = this.cassandraMapping.getColumn(fieldName);
>> +
>> +    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
>> +
>> +  }
>> +
>> +  /**
>> +   * Select a family column in the keyspace.
>> +   * @param cassandraQuery a wrapper of the query
>> +   * @param family the family name to be queried
>> +   * @return a list of family rows
>> +   */
>> +  public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
>> +
>> +    String[] columnNames = cassandraQuery.getColumns(family);
>> +    Query<K, T> query = cassandraQuery.getQuery();
>> +    int limit = (int) query.getLimit();
>> +    String startKey = (String) query.getStartKey();
>> +    String endKey = (String) query.getEndKey();
>> +
>> +    if (startKey == null) {
>> +      startKey = "";
>> +    }
>> +    if (endKey == null) {
>> +      endKey = "";
>> +    }
>> +
>> +
>> +    RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
>> +    rangeSlicesQuery.setColumnFamily(family);
>> +    rangeSlicesQuery.setKeys(startKey, endKey);
>> +    rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
>> +    rangeSlicesQuery.setRowCount(limit);
>> +    rangeSlicesQuery.setColumnNames(columnNames);
>> +
>> +
>> +    QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
>> +    OrderedRows<String, String, String> orderedRows = queryResult.get();
>> +
>> +
>> +    return orderedRows.getList();
>> +  }
>> +
>> +  /**
>> +   * Select the families that contain at least one column mapped to a query field.
>> +   * @param query indicates the columns to select
>> +   * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
>> +   */
>> +  public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
>> +    Map<String, List<String>> map = new HashMap<String, List<String>>();
>> +    for (String field: query.getFields()) {
>> +      String family = this.cassandraMapping.getFamily(field);
>> +      String column = this.cassandraMapping.getColumn(field);
>> +
>> +      // check if the family value was already initialized
>> +      List<String> list = map.get(family);
>> +      if (list == null) {
>> +        list = new ArrayList<String>();
>> +        map.put(family, list);
>> +      }
>> +
>> +      if (column != null) {
>> +        list.add(column);
>> +      }
>> +
>> +    }
>> +
>> +    return map;
>> +  }
>> +
>> +  /**
>> +   * Select the field names according to the column names, which format if fully qualified: "family:column"
>> +   * @param query
>> +   * @return a map which keys are the fully qualified column names and values the query fields
>> +   */
>> +  public Map<String, String> getReverseMap(Query<K, T> query) {
>> +    Map<String, String> map = new HashMap<String, String>();
>> +    for (String field: query.getFields()) {
>> +      String family = this.cassandraMapping.getFamily(field);
>> +      String column = this.cassandraMapping.getColumn(field);
>> +
>> +      map.put(family + ":" + column, field);
>> +    }
>> +
>> +    return map;
>> +
>> +  }
>> +
>> +  public boolean isSuper(String family) {
>> +    return this.cassandraMapping.isSuper(family);
>> +  }
>> +
>> +  public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
>> +    String[] columnNames = cassandraQuery.getColumns(family);
>> +    Query<K, T> query = cassandraQuery.getQuery();
>> +    int limit = (int) query.getLimit();
>> +    String startKey = (String) query.getStartKey();
>> +    String endKey = (String) query.getEndKey();
>> +
>> +    if (startKey == null) {
>> +      startKey = "";
>> +    }
>> +    if (endKey == null) {
>> +      endKey = "";
>> +    }
>> +
>> +
>> +    RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
>> +    rangeSuperSlicesQuery.setColumnFamily(family);
>> +    rangeSuperSlicesQuery.setKeys(startKey, endKey);
>> +    rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
>> +    rangeSuperSlicesQuery.setRowCount(limit);
>> +    rangeSuperSlicesQuery.setColumnNames(columnNames);
>> +
>> +
>> +    QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
>> +    OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
>> +    return orderedRows.getList();
>> +
>> +
>> +  }
>> +}
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
>> @@ -1,50 +1,155 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - *     http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>>  package org.apache.gora.cassandra.store;
>>
>> +import java.io.IOException;
>> +import java.util.ArrayList;
>>  import java.util.HashMap;
>> +import java.util.List;
>>  import java.util.Map;
>> -import java.util.Set;
>> +
>> +import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
>> +import me.prettyprint.cassandra.service.ThriftCfDef;
>> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
>> +import me.prettyprint.hector.api.ddl.ColumnType;
>> +import me.prettyprint.hector.api.ddl.ComparatorType;
>> +
>> +import org.jdom.Document;
>> +import org.jdom.Element;
>> +import org.jdom.JDOMException;
>> +import org.jdom.input.SAXBuilder;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>>  public class CassandraMapping {
>> +
>> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
>> +
>> +  private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
>> +  private static final String KEYSPACE_ELEMENT = "keyspace";
>> +  private static final String NAME_ATTRIBUTE = "name";
>> +  private static final String MAPPING_ELEMENT = "class";
>> +  private static final String COLUMN_ATTRIBUTE = "qualifier";
>> +  private static final String FAMILY_ATTRIBUTE = "family";
>> +  private static final String SUPER_ATTRIBUTE = "type";
>> +  private static final String CLUSTER_ATTRIBUTE = "cluster";
>> +  private static final String HOST_ATTRIBUTE = "host";
>> +
>> +
>> +  private String hostName;
>> +  private String clusterName;
>> +  private String keyspaceName;
>> +
>> +
>> +  /**
>> +   * List of the super column families.
>> +   */
>> +  private List<String> superFamilies = new ArrayList<String>();
>> +
>> +  /**
>> +   * Look up the column family associated to the Avro field.
>> +   */
>> +  private Map<String, String> familyMap = new HashMap<String, String>();
>> +
>> +  /**
>> +   * Look up the column associated to the Avro field.
>> +   */
>> +  private Map<String, String> columnMap = new HashMap<String, String>();
>> +
>> +  /**
>> +   * Look up the column family from its name.
>> +   */
>> +  private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
>>
>> -  private String keySpace;
>> +  public String getHostName() {
>> +    return this.hostName;
>> +  }
>>
>> -  private Map<String, Boolean> families =
>> -    new HashMap<String, Boolean>();
>> +  public String getClusterName() {
>> +    return this.clusterName;
>> +  }
>>
>> -  public String getKeySpace() {
>> -    return keySpace;
>> +  public String getKeyspaceName() {
>> +    return this.keyspaceName;
>>   }
>>
>> -  public void setKeySpace(String keySpace) {
>> -    this.keySpace = keySpace;
>> +
>> +  @SuppressWarnings("unchecked")
>> +  public void loadConfiguration() throws JDOMException, IOException {
>> +    SAXBuilder saxBuilder = new SAXBuilder();
>> +    Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
>> +    Element root = document.getRootElement();
>> +
>> +    Element keyspace = root.getChild(KEYSPACE_ELEMENT);
>> +    this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
>> +    this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
>> +    this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
>> +
>> +    // load column family definitions
>> +    List<Element> elements = keyspace.getChildren();
>> +    for (Element element: elements) {
>> +      BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
>> +
>> +      String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
>> +
>> +      String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
>> +      if (superAttribute != null) {
>> +        this.superFamilies.add(familyName);
>> +        cfDef.setColumnType(ColumnType.SUPER);
>> +        cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
>> +      }
>> +
>> +      cfDef.setKeyspaceName(this.keyspaceName);
>> +      cfDef.setName(familyName);
>> +      cfDef.setComparatorType(ComparatorType.UTF8TYPE);
>> +      cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
>> +
>> +      this.columnFamilyDefinitions.put(familyName, cfDef);
>> +
>> +    }
>> +
>> +    // load column definitions
>> +    Element mapping = root.getChild(MAPPING_ELEMENT);
>> +    elements = mapping.getChildren();
>> +    for (Element element: elements) {
>> +      String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
>> +      String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
>> +      String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
>> +      BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
>> +      if (columnFamilyDefinition == null) {
>> +        LOG.warn("Family " + familyName + " was not declared in the keyspace.");
>> +      }
>> +
>> +      this.familyMap.put(fieldName, familyName);
>> +      this.columnMap.put(fieldName, columnName);
>> +
>> +    }
>>   }
>>
>> -  public Set<String> getColumnFamilies() {
>> -    return families.keySet();
>> +  public String getFamily(String name) {
>> +    return this.familyMap.get(name);
>>   }
>>
>> -  public void addColumnFamily(String columnFamily, boolean isSuper) {
>> -    families.put(columnFamily, isSuper);
>> +  public String getColumn(String name) {
>> +    return this.columnMap.get(name);
>>   }
>>
>> -  public boolean isColumnFamilySuper(String columnFamily) {
>> -    return families.get(columnFamily);
>> +  /**
>> +   * Read family super attribute.
>> +   * @param family the family name
>> +   * @return true is the family is a super column family
>> +   */
>> +  public boolean isSuper(String family) {
>> +    return this.superFamilies.indexOf(family) != -1;
>>   }
>> +
>> +  public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
>> +    List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
>> +    for (String key: this.columnFamilyDefinitions.keySet()) {
>> +      ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
>> +      ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
>> +      list.add(thriftCfDef);
>> +    }
>> +
>> +    return list;
>> +  }
>> +
>>  }
>>
>> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
>> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
>> @@ -1,465 +1,334 @@
>> -/**
>> - * Licensed to the Apache Software Foundation (ASF) under one
>> - * or more contributor license agreements.  See the NOTICE file
>> - * distributed with this work for additional information
>> - * regarding copyright ownership.  The ASF licenses this file
>> - * to you under the Apache License, Version 2.0 (the
>> - * "License"); you may not use this file except in compliance
>> - * with the License.  You may obtain a copy of the License at
>> - *
>> - *     http://www.apache.org/licenses/LICENSE-2.0
>> - *
>> - * Unless required by applicable law or agreed to in writing, software
>> - * distributed under the License is distributed on an "AS IS" BASIS,
>> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> - * See the License for the specific language governing permissions and
>> - * limitations under the License.
>> - */
>>  package org.apache.gora.cassandra.store;
>>
>>  import java.io.IOException;
>> -import java.net.InetAddress;
>>  import java.util.ArrayList;
>>  import java.util.HashMap;
>>  import java.util.List;
>>  import java.util.Map;
>> -import java.util.Map.Entry;
>> -import java.util.Properties;
>> -import java.util.Set;
>> +
>> +import me.prettyprint.hector.api.beans.ColumnSlice;
>> +import me.prettyprint.hector.api.beans.HColumn;
>> +import me.prettyprint.hector.api.beans.HSuperColumn;
>> +import me.prettyprint.hector.api.beans.Row;
>> +import me.prettyprint.hector.api.beans.SuperRow;
>> +import me.prettyprint.hector.api.beans.SuperSlice;
>>
>>  import org.apache.avro.Schema;
>>  import org.apache.avro.Schema.Field;
>>  import org.apache.avro.Schema.Type;
>>  import org.apache.avro.generic.GenericArray;
>>  import org.apache.avro.util.Utf8;
>> -import org.apache.cassandra.thrift.TokenRange;
>> -import org.apache.gora.cassandra.client.CassandraClient;
>> -import org.apache.gora.cassandra.client.Mutate;
>> -import org.apache.gora.cassandra.client.Row;
>> -import org.apache.gora.cassandra.client.Select;
>> -import org.apache.gora.cassandra.client.SimpleCassandraClient;
>> -import org.apache.gora.cassandra.query.CassandraPartitionQuery;
>>  import org.apache.gora.cassandra.query.CassandraQuery;
>>  import org.apache.gora.cassandra.query.CassandraResult;
>> -import org.apache.gora.persistency.ListGenericArray;
>> +import org.apache.gora.cassandra.query.CassandraResultSet;
>> +import org.apache.gora.cassandra.query.CassandraRow;
>> +import org.apache.gora.cassandra.query.CassandraSubColumn;
>> +import org.apache.gora.cassandra.query.CassandraSuperColumn;
>>  import org.apache.gora.persistency.Persistent;
>> -import org.apache.gora.persistency.State;
>> -import org.apache.gora.persistency.StateManager;
>>  import org.apache.gora.persistency.StatefulHashMap;
>> -import org.apache.gora.persistency.StatefulMap;
>> +import org.apache.gora.persistency.impl.PersistentBase;
>> +import org.apache.gora.persistency.impl.StateManagerImpl;
>>  import org.apache.gora.query.PartitionQuery;
>>  import org.apache.gora.query.Query;
>>  import org.apache.gora.query.Result;
>> -import org.apache.gora.store.DataStoreFactory;
>> +import org.apache.gora.query.impl.PartitionQueryImpl;
>>  import org.apache.gora.store.impl.DataStoreBase;
>> -import org.apache.gora.util.ByteUtils;
>> -import org.jdom.Document;
>> -import org.jdom.Element;
>> -import org.jdom.input.SAXBuilder;
>> -
>> -/**
>> - * DataStore for Cassandra.
>> - *
>> - * <p> Note: CassandraStore is not thread-safe. </p>
>> - */
>> -public class CassandraStore<K, T extends Persistent>
>> -extends DataStoreBase<K, T> {
>> -
>> -  private static final String ERROR_MESSAGE =
>> -    "Cassandra does not support creating or modifying ColumnFamilies during runtime";
>> -
>> -  private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
>> -
>> -  private static final int SPLIT_SIZE = 65536;
>> -
>> -  private static final int BATCH_COUNT = 256;
>> -
>> -  private CassandraClient client;
>> -
>> -  private Map<String, CassandraColumn> columnMap;
>> +import org.slf4j.Logger;
>> +import org.slf4j.LoggerFactory;
>>
>> -  private CassandraMapping mapping;
>> -
>> -  @Override
>> -  public void initialize(Class<K> keyClass, Class<T> persistentClass,
>> -      Properties properties) throws IOException {
>> -    super.initialize(keyClass, persistentClass, properties);
>> -
>> -    String mappingFile =
>> -      DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
>> -
>> -    readMapping(mappingFile);
>> +public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
>> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
>> +
>> +  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
>> +
>> +  /**
>> +   * The values are Avro fields pending to be stored.
>> +   */
>> +  private Map<K, T> buffer = new HashMap<K, T>();
>> +
>> +  public CassandraStore() throws Exception {
>> +    this.cassandraClient.init();
>>   }
>>
>>   @Override
>> -  public String getSchemaName() {
>> -    return mapping.getKeySpace();
>> +  public void close() throws IOException {
>> +    LOG.debug("close");
>> +    flush();
>>   }
>>
>>   @Override
>> -  public void createSchema() throws IOException {
>> -    throw new UnsupportedOperationException(ERROR_MESSAGE);
>> +  public void createSchema() {
>> +    LOG.debug("create schema");
>> +    this.cassandraClient.checkKeyspace();
>>   }
>>
>>   @Override
>> -  public void deleteSchema() throws IOException {
>> -    throw new UnsupportedOperationException(ERROR_MESSAGE);
>> +  public boolean delete(K key) throws IOException {
>> +    LOG.debug("delete " + key);
>> +    return false;
>>   }
>>
>>   @Override
>> -  public boolean schemaExists() throws IOException {
>> -    return true;
>> +  public long deleteByQuery(Query<K, T> query) throws IOException {
>> +    LOG.debug("delete by query " + query);
>> +    return 0;
>>   }
>>
>> -  public CassandraClient getClientByLocation(String endPoint) {
>> -    return client;
>> +  @Override
>> +  public void deleteSchema() throws IOException {
>> +    LOG.debug("delete schema");
>> +    this.cassandraClient.dropKeyspace();
>>   }
>>
>> -  public Select createSelect(String[] fields) {
>> -    Select select = new Select();
>> -    if (fields == null) {
>> -      fields = beanFactory.getCachedPersistent().getFields();
>> -    }
>> -    for (String f : fields) {
>> -      CassandraColumn col = columnMap.get(f);
>> -      Schema fieldSchema = fieldMap.get(f).schema();
>> -      switch (fieldSchema.getType()) {
>> -        case MAP:
>> -        case ARRAY:
>> -          if (col.isSuperColumn()) {
>> -            select.addAllColumnsForSuperColumn(col.family, col.superColumn);
>> -          } else {
>> -            select.addColumnAll(col.family);
>> -          }
>> -          break;
>> -        default:
>> -          if (col.isSuperColumn()) {
>> -            select.addColumnName(col.family, col.superColumn, col.column);
>> -          } else {
>> -            select.addColumnName(col.family, col.column);
>> -          }
>> -          break;
>> +  @Override
>> +  public Result<K, T> execute(Query<K, T> query) throws IOException {
>> +
>> +    Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
>> +    Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
>> +
>> +    CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
>> +    cassandraQuery.setQuery(query);
>> +    cassandraQuery.setFamilyMap(familyMap);
>> +
>> +    CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
>> +    cassandraResult.setReverseMap(reverseMap);
>> +
>> +    CassandraResultSet cassandraResultSet = new CassandraResultSet();
>> +
>> +    // We query Cassandra keyspace by families.
>> +    for (String family : familyMap.keySet()) {
>> +      if (this.cassandraClient.isSuper(family)) {
>> +        addSuperColumns(family, cassandraQuery, cassandraResultSet);
>> +
>> +      } else {
>> +        addSubColumns(family, cassandraQuery, cassandraResultSet);
>> +
>>       }
>> +
>>     }
>> -    return select;
>> -  }
>> +
>> +    cassandraResult.setResultSet(cassandraResultSet);
>> +
>> +
>> +    return cassandraResult;
>>
>> -  @Override
>> -  public T get(K key, String[] fields) throws IOException {
>> -    if (fields == null) {
>> -      fields = beanFactory.getCachedPersistent().getFields();
>> -    }
>> -    Select select = createSelect(fields);
>> -    try {
>> -      Row result = client.get(key.toString(), select);
>> -      return newInstance(result, fields);
>> -    } catch (Exception e) {
>> -      throw new IOException(e);
>> -    }
>>   }
>>
>> -  @SuppressWarnings("rawtypes")
>> -  private void setField(T persistent, Field field, StatefulMap map) {
>> -    persistent.put(field.pos(), map);
>> -  }
>> -
>> -  private void setField(T persistent, Field field, byte[] val)
>> -  throws IOException {
>> -    persistent.put(field.pos()
>> -        , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
>> -  }
>> -
>> -  @SuppressWarnings("rawtypes")
>> -  private void setField(T persistent, Field field, GenericArray list) {
>> -    persistent.put(field.pos(), list);
>> -  }
>> -
>> -  @SuppressWarnings({ "rawtypes", "unchecked" })
>> -  public T newInstance(Row result, String[] fields)
>> -  throws IOException {
>> -    if(result == null)
>> -      return null;
>> -
>> -    T persistent = newPersistent();
>> -    StateManager stateManager = persistent.getStateManager();
>> -    for (String f : fields) {
>> -      CassandraColumn col = columnMap.get(f);
>> -      Field field = fieldMap.get(f);
>> -      Schema fieldSchema = field.schema();
>> -      Map<String, byte[]> qualMap;
>> -      switch(fieldSchema.getType()) {
>> -        case MAP:
>> -          if (col.isSuperColumn()) {
>> -            qualMap = result.getSuperColumn(col.family, col.superColumn);
>> -          } else {
>> -            qualMap = result.getColumn(col.family);
>> -          }
>> -          if (qualMap == null) {
>> -            continue;
>> -          }
>> -          Schema valueSchema = fieldSchema.getValueType();
>> -          StatefulMap map = new StatefulHashMap();
>> -          for (Entry<String, byte[]> e : qualMap.entrySet()) {
>> -            Utf8 mapKey = new Utf8(e.getKey());
>> -            map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
>> -            map.putState(mapKey, State.CLEAN);
>> -          }
>> -          setField(persistent, field, map);
>> -          break;
>> -        case ARRAY:
>> -          if (col.isSuperColumn()) {
>> -            qualMap = result.getSuperColumn(col.family, col.superColumn);
>> -          } else {
>> -            qualMap = result.getColumn(col.family);
>> -          }
>> -          if (qualMap == null) {
>> -            continue;
>> -          }
>> -          valueSchema = fieldSchema.getElementType();
>> -          ArrayList arrayList = new ArrayList();
>> -          for (Entry<String, byte[]> e : qualMap.entrySet()) {
>> -            arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
>> -          }
>> -          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
>> -          setField(persistent, field, arr);
>> -          break;
>> -        default:
>> -          byte[] val;
>> -          if (col.isSuperColumn()) {
>> -            val = result.get(col.family, col.superColumn, col.column);
>> -          } else {
>> -            val = result.get(col.family, col.column);
>> -          }
>> -          if (val == null) {
>> -            continue;
>> -          }
>> -          setField(persistent, field, val);
>> -          break;
>> +  private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
>> +      CassandraResultSet cassandraResultSet) {
>> +    // select family columns that are included in the query
>> +    List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
>> +
>> +    for (Row<String, String, String> row : rows) {
>> +      String key = row.getKey();
>> +
>> +      // find associated row in the resultset
>> +      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
>> +      if (cassandraRow == null) {
>> +        cassandraRow = new CassandraRow();
>> +        cassandraResultSet.putRow(key, cassandraRow);
>> +        cassandraRow.setKey(key);
>>       }
>> +
>> +      ColumnSlice<String, String> columnSlice = row.getColumnSlice();
>> +
>> +      for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
>> +        CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
>> +        cassandraSubColumn.setValue(hColumn);
>> +        cassandraSubColumn.setFamily(family);
>> +        cassandraRow.add(cassandraSubColumn);
>> +      }
>> +
>>     }
>> -    stateManager.clearDirty(persistent);
>> -    return persistent;
>>   }
>>
>> -  @Override
>> -  public void put(K key, T obj) throws IOException {
>> -    Mutate mutate = new Mutate();
>> -    Schema schema = obj.getSchema();
>> -    StateManager stateManager = obj.getStateManager();
>> -    List<Field> fields = schema.getFields();
>> -    String qual;
>> -    byte[] value;
>> -    for (int i = 0; i < fields.size(); i++) {
>> -      if (!stateManager.isDirty(obj, i)) {
>> -        continue;
>> +  private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
>> +      CassandraResultSet cassandraResultSet) {
>> +
>> +    List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
>> +    for (SuperRow<String, String, String, String> superRow: superRows) {
>> +      String key = superRow.getKey();
>> +      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
>> +      if (cassandraRow == null) {
>> +        cassandraRow = new CassandraRow();
>> +        cassandraResultSet.putRow(key, cassandraRow);
>> +        cassandraRow.setKey(key);
>>       }
>> -      Field field = fields.get(i);
>> -      Type type = field.schema().getType();
>> -      Object o = obj.get(i);
>> -      CassandraColumn col = columnMap.get(field.name());
>> -
>> -      switch(type) {
>> -      case MAP:
>> -        if(o instanceof StatefulMap) {
>> -          @SuppressWarnings("unchecked")
>> -          StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
>> -          for (Entry<Utf8, State> e : map.states().entrySet()) {
>> -            Utf8 mapKey = e.getKey();
>> -            switch (e.getValue()) {
>> -            case DIRTY:
>> -              qual = mapKey.toString();
>> -              value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
>> -              if (col.isSuperColumn()) {
>> -                mutate.put(col.family, col.superColumn, qual, value);
>> -              } else {
>> -                mutate.put(col.family, qual, value);
>> -              }
>> -              break;
>> -            case DELETED:
>> -              qual = mapKey.toString();
>> -              if (col.isSuperColumn()) {
>> -                mutate.delete(col.family, col.superColumn, qual);
>> -              } else {
>> -                mutate.delete(col.family, qual);
>> -              }
>> -              break;
>> -            }
>> -          }
>> -        } else {
>> -          @SuppressWarnings({ "rawtypes", "unchecked" })
>> -          Set<Map.Entry> set = ((Map)o).entrySet();
>> -          for(@SuppressWarnings("rawtypes") Entry entry: set) {
>> -            qual = entry.getKey().toString();
>> -            value = ByteUtils.toBytes(entry.getValue().toString());
>> -            if (col.isSuperColumn()) {
>> -              mutate.put(col.family, col.superColumn, qual, value);
>> -            } else {
>> -              mutate.put(col.family, qual, value);
>> -            }
>> -          }
>> -        }
>> -        break;
>> -      case ARRAY:
>> -        if(o instanceof GenericArray) {
>> -          @SuppressWarnings("rawtypes")
>> -          GenericArray arr = (GenericArray) o;
>> -          int j=0;
>> -          for(Object item : arr) {
>> -            value = ByteUtils.toBytes(item.toString());
>> -            if (col.isSuperColumn()) {
>> -              mutate.put(col.family, col.superColumn, Integer.toString(j), value);
>> -            } else {
>> -              mutate.put(col.family, Integer.toString(j), value);
>> -            }
>> -            j++;
>> -          }
>> -        }
>> -        break;
>> -      default:
>> -        value = ByteUtils.toBytes(o, field.schema(), datumWriter);
>> -        if (col.isSuperColumn()) {
>> -          mutate.put(col.family, col.superColumn, col.column, value);
>> -        } else {
>> -          mutate.put(col.family, col.column, value);
>> -        }
>> -        break;
>> +
>> +      SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
>> +      for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
>> +        CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
>> +        cassandraSuperColumn.setValue(hSuperColumn);
>> +        cassandraSuperColumn.setFamily(family);
>> +        cassandraRow.add(cassandraSuperColumn);
>>       }
>>     }
>> -
>> -    if(!mutate.isEmpty())
>> -      client.mutate(key.toString(), mutate);
>>   }
>>
>> -  @Override
>> -  public boolean delete(K key) throws IOException {
>> -    Mutate mutate = new Mutate();
>> -    for (String family : mapping.getColumnFamilies()) {
>> -      mutate.deleteAll(family);
>> +  /**
>> +   * Flush the buffer. Write the buffered rows.
>> +   * @see org.apache.gora.store.DataStore#flush()
>> +   */
>> +  @Override
>> +  public void flush() throws IOException {
>> +    for (K key: this.buffer.keySet()) {
>> +      T value = this.buffer.get(key);
>> +      Schema schema = value.getSchema();
>> +      for (Field field: schema.getFields()) {
>> +        if (value.isDirty(field.pos())) {
>> +          addOrUpdateField((String) key, field, value.get(field.pos()));
>> +        }
>> +      }
>>     }
>> -
>> -    client.mutate(key.toString(), mutate);
>> -    return true;
>> +
>> +    this.buffer.clear();
>>   }
>>
>>   @Override
>> -  public void flush() throws IOException { }
>> -
>> -  @Override
>> -  public void close() throws IOException {
>> -    client.close();
>> +  public T get(K key, String[] fields) throws IOException {
>> +    LOG.info("get " + key);
>> +    return null;
>>   }
>>
>>   @Override
>> -  public Query<K, T> newQuery() {
>> -    return new CassandraQuery<K, T>(this);
>> +  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
>> +      throws IOException {
>> +    // just a single partition
>> +    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
>> +    partitions.add(new PartitionQueryImpl<K,T>(query));
>> +    return partitions;
>>   }
>>
>>   @Override
>> -  public long deleteByQuery(Query<K, T> query) throws IOException {
>> -    // TODO Auto-generated method stub
>> -    return 0;
>> +  public String getSchemaName() {
>> +    LOG.info("get schema name");
>> +    return null;
>>   }
>>
>>   @Override
>> -  public Result<K, T> execute(Query<K, T> query) throws IOException {
>> -    return new CassandraResult<K, T>(this, query, BATCH_COUNT);
>> +  public Query<K, T> newQuery() {
>> +    return new CassandraQuery<K, T>(this);
>>   }
>>
>> -  @Override
>> -  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
>> -  throws IOException {
>> -    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
>> -
>> -    List<TokenRange> rangeList = client.describeRing();
>> -    for (TokenRange range : rangeList) {
>> -      List<String> tokens =
>> -        client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
>> -      // turn the sub-ranges into InputSplits
>> -      String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
>> -      // hadoop needs hostname, not ip
>> -      for (int i = 0; i < endpoints.length; i++) {
>> -          endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
>> -      }
>> -
>> -      for (int i = 1; i < tokens.size(); i++) {
>> -        CassandraPartitionQuery<K, T> partitionQuery =
>> -          new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
>> -        partitions.add(partitionQuery);
>> +  /**
>> +   * Duplicate instance to keep all the objects in memory till flushing.
>> +   * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
>> +   */
>> +  @Override
>> +  public void put(K key, T value) throws IOException {
>> +    T p = (T) value.newInstance(new StateManagerImpl());
>> +    Schema schema = value.getSchema();
>> +    for (Field field: schema.getFields()) {
>> +      if (value.isDirty(field.pos())) {
>> +        Object fieldValue = value.get(field.pos());
>> +
>> +        // check if field has a nested structure (map or record)
>> +        Schema fieldSchema = field.schema();
>> +        Type type = fieldSchema.getType();
>> +        switch(type) {
>> +          case RECORD:
>> +            Persistent persistent = (Persistent) fieldValue;
>> +            Persistent newRecord = persistent.newInstance(new StateManagerImpl());
>> +            for (Field member: fieldSchema.getFields()) {
>> +              newRecord.put(member.pos(), persistent.get(member.pos()));
>> +            }
>> +            fieldValue = newRecord;
>> +            break;
>> +          case MAP:
>> +            StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
>> +            StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
>> +            fieldValue = newMap;
>> +            break;
>> +        }
>> +
>> +        p.put(field.pos(), fieldValue);
>>       }
>>     }
>> -    return partitions;
>> -  }
>> -
>> -  private CassandraClient createClient() throws IOException {
>> -    String serverStr =
>> -      DataStoreFactory.findPropertyOrDie(properties, this, "servers");
>> -    String[] server1Parts = serverStr.split(",")[0].split(":");
>> -    try {
>> -      return new SimpleCassandraClient(server1Parts[0],
>> -          Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
>> -    } catch (Exception e) {
>> -      throw new IOException(e);
>> -    }
>> -  }
>> -
>> -  @SuppressWarnings("unchecked")
>> -  protected void readMapping(String filename) throws IOException {
>> -
>> -    mapping = new CassandraMapping();
>> -    columnMap = new HashMap<String, CassandraColumn>();
>> -
>> -    try {
>> -      SAXBuilder builder = new SAXBuilder();
>> -      Document doc = builder.build(getClass().getClassLoader()
>> -          .getResourceAsStream(filename));
>> -
>> -      List<Element> classes = doc.getRootElement().getChildren("class");
>> -
>> -      for(Element classElement: classes) {
>> -        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
>> -            && classElement.getAttributeValue("name").equals(
>> -                persistentClass.getCanonicalName())) {
>> -
>> -          String keySpace = classElement.getAttributeValue("keyspace");
>> -          mapping.setKeySpace(keySpace);
>> -          client = createClient();
>> -          Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
>> -          for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
>> -            boolean isSuper = e.getValue().get("Type").equals("Super");
>> -            mapping.addColumnFamily(e.getKey(), isSuper);
>> -          }
>> -
>> -          List<Element> fields = classElement.getChildren("field");
>> -
>> -          for(Element field:fields) {
>> -            String fieldName = field.getAttributeValue("name");
>> -            String path = field.getAttributeValue("path");
>> -            String[] parts = path.split(":");
>> -            String columnFamily = parts[0];
>> -            String superColumn = null;
>> -            String column = null;
>> -
>> -            boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
>> -            if (isSuper) {
>> -              superColumn = parts[1];
>> -              if (parts.length == 3) {
>> -                column = parts[2];
>> +
>> +    this.buffer.put(key, p);
>> + }
>> +
>> +  /**
>> +   * Add a field to Cassandra according to its type.
>> +   * @param key     the key of the row where the field should be added
>> +   * @param field   the Avro field representing a datum
>> +   * @param value   the field value
>> +   */
>> +  private void addOrUpdateField(String key, Field field, Object value) {
>> +    Schema schema = field.schema();
>> +    Type type = schema.getType();
>> +    //LOG.info(field.name() + " " + type.name());
>> +    switch (type) {
>> +      case STRING:
>> +        this.cassandraClient.addColumn(key, field.name(), value);
>> +        break;
>> +      case INT:
>> +        this.cassandraClient.addColumn(key, field.name(), value);
>> +        break;
>> +      case LONG:
>> +        this.cassandraClient.addColumn(key, field.name(), value);
>> +        break;
>> +      case BYTES:
>> +        this.cassandraClient.addColumn(key, field.name(), value);
>> +        break;
>> +      case FLOAT:
>> +        this.cassandraClient.addColumn(key, field.name(), value);
>> +        break;
>> +      case RECORD:
>> +        if (value != null) {
>> +          if (value instanceof PersistentBase) {
>> +            PersistentBase persistentBase = (PersistentBase) value;
>> +            for (Field member: schema.getFields()) {
>> +
>> +              // TODO: hack, do not store empty arrays
>> +              Object memberValue = persistentBase.get(member.pos());
>> +              if (memberValue instanceof GenericArray<?>) {
>> +                GenericArray<String> array = (GenericArray<String>) memberValue;
>> +                if (array.size() == 0) {
>> +                  continue;
>> +                }
>>               }
>> -            } else {
>> -              if (parts.length == 2) {
>> -                column = parts[1];
>> +
>> +              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
>> +            }
>> +          } else {
>> +            LOG.info("Record not supported: " + value.toString());
>> +
>> +          }
>> +        }
>> +        break;
>> +      case MAP:
>> +        if (value != null) {
>> +          if (value instanceof StatefulHashMap<?, ?>) {
>> +            //TODO cast to stateful map and only write dirty keys
>> +            Map<Utf8, Object> map = (Map<Utf8, Object>) value;
>> +            for (Utf8 mapKey: map.keySet()) {
>> +
>> +              // TODO: hack, do not store empty arrays
>> +              Object keyValue = map.get(mapKey);
>> +              if (keyValue instanceof GenericArray<?>) {
>> +                GenericArray<String> array = (GenericArray<String>) keyValue;
>> +                if (array.size() == 0) {
>> +                  continue;
>> +                }
>>               }
>> +
>> +              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
>>             }
>> -
>> -            columnMap.put(fieldName,
>> -                new CassandraColumn(columnFamily, superColumn, column));
>> +          } else {
>> +            LOG.info("Map not supported: " + value.toString());
>>           }
>> -
>> -          break;
>>         }
>> -      }
>> -    } catch(Exception ex) {
>> -      throw new IOException(ex);
>> +        break;
>> +      default:
>> +        LOG.info("Type not considered: " + type.name());
>>     }
>>   }
>> -}
>> \ No newline at end of file
>> +
>> +  @Override
>> +  public boolean schemaExists() throws IOException {
>> +    LOG.info("schema exists");
>> +    return false;
>> +  }
>> +
>> +}
>>
>> Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
>> ==============================================================================
>> --- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
>> +++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
>> @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
>>  public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
>>   public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
>>
>> -  private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
>> -  private static final int BUFFER_LIMIT_READ_VALUE = 10000;
>> +  public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
>> +  public static final int BUFFER_LIMIT_READ_VALUE = 10000;
>>
>>   protected Query<K,T> query;
>>   protected Result<K,T> result;
>>
>>
>>
>