You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by al...@apache.org on 2011/07/22 02:34:02 UTC

svn commit: r1149420 - in /incubator/gora/trunk: gora-cassandra/ivy/ gora-cassandra/lib-ext/ gora-cassandra/src/main/java/org/apache/gora/cassandra/client/ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java/org/a...

Author: alexis
Date: Fri Jul 22 00:33:59 2011
New Revision: 1149420

URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
Log:
Cassandra 0.8 backend witch Hector client

Added:
    incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar   (with props)
    incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar   (with props)
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
Removed:
    incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
    incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
Modified:
    incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java

Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
+++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
@@ -22,7 +22,7 @@
       organisation="org.apache.gora"
       module="gora-cassandra"
       status="integration"/>
-
+      
   <configurations>
     <include file="${project.dir}/ivy/ivy-configurations.xml"/>
   </configurations>
@@ -32,15 +32,25 @@
     <artifact name="gora-cassandra-test" conf="test"/>
   </publications>
 
+  
   <dependencies>
     <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
-    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/> 
-    <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
-
-    <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
-    <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
-
-    <dependency org="com.google.guava" name="guava" rev="r06"/>
+    
+    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
+    
+    <dependency org="org.jdom" name="jdom" rev="1.1">
+    	<exclude org="xerces" name="xercesImpl"/>
+    </dependency>
+    
+    <!--
+        <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
+    	<dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
+    -->
+    <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
+    <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
+    <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
+    <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
+    <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
 
     <!-- test dependencies -->
 

Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,41 @@
+package org.apache.gora.cassandra.query;
+
+import org.apache.avro.Schema.Field;
+
+
+/**
+ * Represents a unit of data: a key value pair tagged by a family name
+ */
+public abstract class CassandraColumn {
+  public static final int SUB = 0;
+  public static final int SUPER = 1;
+  
+  private String family;
+  private int type;
+  private Field field;
+  
+  public String getFamily() {
+    return family;
+  }
+  public void setFamily(String family) {
+    this.family = family;
+  }
+  public int getType() {
+    return type;
+  }
+  public void setType(int type) {
+    this.type = type;
+  }
+  public void setField(Field field) {
+    this.field = field;
+  }
+  
+  protected Field getField() {
+    return this.field;
+  }
+  
+  public abstract String getName();
+  public abstract Object getValue();
+  
+
+}

Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gora.cassandra.query;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.query.Query;
-import org.apache.gora.query.impl.PartitionQueryImpl;
-import org.apache.hadoop.io.Text;
-
-public class CassandraPartitionQuery<K, T extends Persistent>
-extends PartitionQueryImpl<K, T> {
-
-  private String startToken;
-
-  private String endToken;
-
-  private String[] endPoints;
-
-  private int splitSize;
-
-  public CassandraPartitionQuery() {
-    this.dataStore = null;
-  }
-
-  public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
-      int splitSize) {
-    super(baseQuery);
-    this.startToken = startToken;
-    this.endToken = endToken;
-    this.endPoints = endPoints;
-    this.splitSize = splitSize;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    Text.writeString(out, startToken);
-    Text.writeString(out, endToken);
-    out.writeInt(endPoints.length);
-    for (String endPoint : endPoints) {
-      Text.writeString(out, endPoint);
-    }
-    out.writeInt(splitSize);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    startToken = Text.readString(in);
-    endToken = Text.readString(in);
-    int size = in.readInt();
-    endPoints = new String[size];
-    for (int i = 0; i < size; i++) {
-      endPoints[i] = Text.readString(in);
-    }
-    splitSize = in.readInt();
-  }
-
-  public String getStartToken() {
-    return startToken;
-  }
-
-  public String getEndToken() {
-    return endToken;
-  }
-
-  public String[] getEndPoints() {
-    return endPoints;
-  }
-
-  public int getSplitSize() {
-    return splitSize;
-  }
-}

Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
@@ -1,34 +1,55 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.gora.cassandra.query;
 
+import java.util.List;
+import java.util.Map;
+
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.QueryBase;
 import org.apache.gora.store.DataStore;
 
-public class CassandraQuery<K, T extends Persistent>
-extends QueryBase<K, T> {
+public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
 
+  private Query<K, T> query;
+  
+  /**
+   * Maps Avro fields to Cassandra columns.
+   */
+  private Map<String, List<String>> familyMap;
+  
   public CassandraQuery() {
     super(null);
   }
-
   public CassandraQuery(DataStore<K, T> dataStore) {
     super(dataStore);
   }
+  public void setFamilyMap(Map<String, List<String>> familyMap) {
+    this.familyMap = familyMap;
+  }
+  public Map<String, List<String>> getFamilyMap() {
+    return familyMap;
+  }
+  
+  /**
+   * @param family the family name
+   * @return an array of the query column names belonging to the family
+   */
+  public String[] getColumns(String family) {
+    
+    List<String> columnList = familyMap.get(family);
+    String[] columns = new String[columnList.size()];
+    for (int i = 0; i < columns.length; ++i) {
+      columns[i] = columnList.get(i);
+    }
+    return columns;
+  }
+  public Query<K, T> getQuery() {
+    return query;
+  }
+  public void setQuery(Query<K, T> query) {
+    this.query = query;
+  }
+  
+  
+
 }

Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
@@ -1,157 +1,97 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.gora.cassandra.query;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-import org.apache.gora.cassandra.client.CassandraClient;
-import org.apache.gora.cassandra.client.Row;
-import org.apache.gora.cassandra.client.Select;
-import org.apache.gora.cassandra.store.CassandraStore;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
 import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class CassandraResult<K, T extends Persistent>
-extends ResultBase<K, T> {
-
-  private Iterator<Row> rowIter;
-
-  private CassandraStore<K, T> store;
+public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
+  
+  private int rowNumber;
+
+  private CassandraResultSet cassandraResultSet;
+  
+  /**
+   * Maps Cassandra columns to Avro fields.
+   */
+  private Map<String, String> reverseMap;
 
-  private String[] fields;
-
-  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
-      int batchRowCount) throws IOException {
+  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
     super(dataStore, query);
-
-    store = (CassandraStore<K, T>) dataStore;
-    fields = query.getFields();
-
-    boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
-    String startTokenOrKey;
-    String endTokenOrKey;
-
-    if (isUsingTokens) {
-      CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
-      startTokenOrKey = partitionQuery.getStartToken();
-      endTokenOrKey = partitionQuery.getEndToken();
-    } else {
-      CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
-      startTokenOrKey = cassandraQuery.getStartKey().toString();
-      endTokenOrKey = cassandraQuery.getEndKey().toString();
-    }
-
-    Select select = store.createSelect(fields);
-
-    CassandraClient client = store.getClientByLocation(getLocation(query));
-    if (isUsingTokens) {
-      rowIter =
-        client.getTokenRange(startTokenOrKey, endTokenOrKey,
-            batchRowCount, select).iterator();
-    } else {
-      rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
-          batchRowCount, select).iterator();
-    }
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    return 0;
   }
 
   @Override
   protected boolean nextInner() throws IOException {
-    if (!rowIter.hasNext()) {
-      return false;
-    }
-    Row row = rowIter.next();
-    if (row == null) {
-      return false;
+    if (this.rowNumber < this.cassandraResultSet.size()) {
+      updatePersistent();
     }
-
-    key = toKey(row.getKey());
-    persistent = store.newInstance(row, fields);
-
-    return true;
+    ++this.rowNumber;
+    return (this.rowNumber <= this.cassandraResultSet.size());
   }
 
+
+  /**
+   * Load key/value pair from Cassandra row to Avro record.
+   * @throws IOException
+   */
   @SuppressWarnings("unchecked")
-  private K toKey(String keyStr) {
-    Class<K> keyClass = dataStore.getKeyClass();
-    if (keyClass.isAssignableFrom(String.class)) {
-      return (K) keyStr;
-    }
-    if (keyClass.isAssignableFrom(Integer.class)) {
-      return (K) (Integer) Integer.parseInt(keyStr);
-    }
-    if (keyClass.isAssignableFrom(Float.class)) {
-      return (K) (Float) Float.parseFloat(keyStr);
-    }
-    if (keyClass.isAssignableFrom(Double.class)) {
-      return (K) (Double) Double.parseDouble(keyStr);
-    }
-    if (keyClass.isAssignableFrom(Long.class)) {
-      return (K) (Long) Long.parseLong(keyStr);
-    }
-    if (keyClass.isAssignableFrom(Short.class)) {
-      return (K) (Short) Short.parseShort(keyStr);
-    }
-    if (keyClass.isAssignableFrom(Byte.class)) {
-      return (K) (Byte) Byte.parseByte(keyStr);
+  private void updatePersistent() throws IOException {
+    CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
+    
+    // load key
+    this.key = (K) cassandraRow.getKey();
+    
+    // load value
+    Schema schema = this.persistent.getSchema();
+    List<Field> fields = schema.getFields();
+    
+    for (CassandraColumn cassandraColumn: cassandraRow) {
+      
+      // get field name
+      String family = cassandraColumn.getFamily();
+      String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
+      
+      // get field
+      int pos = this.persistent.getFieldIndex(fieldName);
+      Field field = fields.get(pos);
+      
+      // get value
+      cassandraColumn.setField(field);
+      Object value = cassandraColumn.getValue();
+      
+      this.persistent.put(pos, value);
+      // this field does not need to be written back to the store
+      this.persistent.clearDirty(pos);
     }
 
-    throw new RuntimeException("Can't parse " + keyStr +
-                               " as an instance of " + keyClass);
   }
 
   @Override
-  public void close() throws IOException { }
+  public void close() throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
 
-  private String getLocation(Query<K, T> query) {
-    if (!(query instanceof CassandraPartitionQuery)) {
-      return null;
-    }
-    CassandraPartitionQuery<K, T> partitonQuery =
-      (CassandraPartitionQuery<K, T>) query;
-    InetAddress[] localAddresses = new InetAddress[0];
-    try {
-      localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
-    } catch (UnknownHostException e) {
-      throw new AssertionError(e);
-    }
-    for (InetAddress address : localAddresses) {
-      for (String location : partitonQuery.getEndPoints()) {
-        InetAddress locationAddress = null;
-        try {
-          locationAddress = InetAddress.getByName(location);
-        } catch (UnknownHostException e) {
-          throw new AssertionError(e);
-        }
-        if (address.equals(locationAddress)) {
-          return location;
-        }
-      }
-    }
-    return partitonQuery.getEndPoints()[0];
+  @Override
+  public float getProgress() throws IOException {
+    return (((float) this.rowNumber) / this.cassandraResultSet.size());
   }
-}
\ No newline at end of file
+
+  public void setResultSet(CassandraResultSet cassandraResultSet) {
+    this.cassandraResultSet = cassandraResultSet;
+  }
+  
+  public void setReverseMap(Map<String, String> reverseMap) {
+    this.reverseMap = reverseMap;
+  }
+
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,36 @@
+package org.apache.gora.cassandra.query;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * List data structure to keep the order coming from the Cassandra selects.
+ */
+public class CassandraResultSet extends ArrayList<CassandraRow> {
+
+  /**
+   * 
+   */
+  private static final long serialVersionUID = -7620939600192859652L;
+
+  /**
+   * Maps keys to indices in the list.
+   */
+  private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
+
+  public CassandraRow getRow(String key) {
+    Integer integer = this.indexMap.get(key);
+    if (integer == null) {
+      return null;
+    }
+    
+    return this.get(integer);
+  }
+
+  public void putRow(String key, CassandraRow cassandraRow) {
+    this.add(cassandraRow);
+    this.indexMap.put(key, this.size()-1);
+  } 
+  
+
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,24 @@
+package org.apache.gora.cassandra.query;
+
+import java.util.ArrayList;
+
+/**
+ * List of key value pairs representing a row, tagged by a key.
+ */
+public class CassandraRow extends ArrayList<CassandraColumn> {
+
+  /**
+   * 
+   */
+  private static final long serialVersionUID = -7620939600192859652L;
+  private String key;
+
+  public String getKey() {
+    return this.key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,104 @@
+package org.apache.gora.cassandra.query;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import me.prettyprint.hector.api.beans.HColumn;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSubColumn extends CassandraColumn {
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
+
+  private static final String ENCODING = "UTF-8";
+  
+  private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
+
+
+  /**
+   * Key-value pair containing the raw data.
+   */
+  private HColumn<String, String> hColumn;
+
+  public String getName() {
+    return hColumn.getName();
+  }
+
+  /**
+   * Deserialize a String into an typed Object, according to the field schema.
+   * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
+   */
+  public Object getValue() {
+    Field field = getField();
+    Schema fieldSchema = field.schema();
+    Type type = fieldSchema.getType();
+    String valueString = hColumn.getValue();
+    Object value = null;
+    
+    switch (type) {
+      case STRING:
+        value = new Utf8(valueString);
+        break;
+      case BYTES:
+        // convert string to bytebuffer
+        value = getByteBuffer(valueString);
+        break;
+      case INT:
+        value = Integer.parseInt(valueString);
+        break;
+      case LONG:
+        value = Long.parseLong(valueString);
+        break;
+      case FLOAT:
+        value = Float.parseFloat(valueString);
+        break;
+      case ARRAY:
+        // convert string to array
+        valueString = valueString.substring(1, valueString.length()-1);
+        String[] elements = valueString.split(", ");
+        
+        Type elementType = fieldSchema.getElementType().getType();
+        if (elementType == Schema.Type.STRING) {
+          // the array type is String
+          GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
+          for (String element: elements) {
+            genericArray.add(element);
+          }
+          
+          value = genericArray;
+        } else {
+          LOG.info("Element type not supported: " + elementType);
+        }
+        break;
+      default:
+        LOG.info("Type not supported: " + type);
+    }
+    
+    return value;
+
+  }
+
+  public void setValue(HColumn<String, String> hColumn) {
+    this.hColumn = hColumn;
+  }
+  
+  public static ByteBuffer getByteBuffer(String valueString) {
+    ByteBuffer byteBuffer = null;
+    try {
+      byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
+    } catch (CharacterCodingException cce) {
+      LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
+    }
+    return byteBuffer;
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,102 @@
+package org.apache.gora.cassandra.query;
+
+import java.util.Map;
+
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSuperColumn extends CassandraColumn {
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
+
+  private HSuperColumn<String, String, String> hSuperColumn;
+  
+  public String getName() {
+    return hSuperColumn.getName();
+  }
+
+  public Object getValue() {
+    Field field = getField();
+    Schema fieldSchema = field.schema();
+    Type type = fieldSchema.getType();
+    
+    Object value = null;
+    
+    switch (type) {
+      case MAP:
+        Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
+        Type valueType = fieldSchema.getValueType().getType();
+        
+        for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
+          String memberString = hColumn.getValue();
+          Object memberValue = null;
+          switch (valueType) {
+            case STRING:
+              memberValue = new Utf8(memberString);
+              break;
+            case BYTES:
+              memberValue = CassandraSubColumn.getByteBuffer(memberString);
+              break;
+            default:
+              LOG.info("Type for the map value is not supported: " + valueType);
+                
+          }
+          map.put(new Utf8(hColumn.getName()), memberValue);      
+        }
+        value = map;
+        
+        break;
+      case RECORD:
+        String fullName = fieldSchema.getFullName();
+        
+        Class<?> claz = null;
+        try {
+          claz = Class.forName(fullName);
+        } catch (ClassNotFoundException cnfe) {
+          LOG.warn("Unable to load class " + fullName, cnfe);
+          break;
+        }
+
+        try {
+          value = claz.newInstance();          
+        } catch (InstantiationException ie) {
+          LOG.warn("Instantiation error", ie);
+          break;
+        } catch (IllegalAccessException iae) {
+          LOG.warn("Illegal access error", iae);
+          break;
+        }
+        
+        // we updated the value instance, now update its members
+        if (value instanceof PersistentBase) {
+          PersistentBase record = (PersistentBase) value;
+
+          for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
+            Field memberField = fieldSchema.getField(hColumn.getName());
+            CassandraSubColumn cassandraColumn = new CassandraSubColumn();
+            cassandraColumn.setField(memberField);
+            cassandraColumn.setValue(hColumn);
+            record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
+          }
+        }
+        break;
+      default:
+        LOG.info("Type not supported: " + type);
+    }
+    
+    return value;
+  }
+  
+  public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
+    this.hSuperColumn = hSuperColumn;
+  }
+
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
@@ -0,0 +1,253 @@
+package org.apache.gora.cassandra.store;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.OrderedSuperRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.SuperRow;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
+
+import org.apache.gora.cassandra.query.CassandraQuery;
+import org.apache.gora.mapreduce.GoraRecordReader;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.util.ByteUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraClient<K, T extends Persistent> {
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
+  
+  private Cluster cluster;
+  private Keyspace keyspace;
+  private Mutator<String> mutator;
+  
+  private CassandraMapping cassandraMapping = new CassandraMapping();
+
+  private Serializer<String> stringSerializer = new StringSerializer();
+  
+  public void init() throws Exception {
+    this.cassandraMapping.loadConfiguration();
+    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
+    
+    // add keyspace to cluster
+    checkKeyspace();
+    
+    // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
+    this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
+    
+    this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
+  }
+  
+  /**
+   * Check if keyspace already exists. If not, create it.
+   */
+  public void checkKeyspace() {
+    // "describe keyspace <keyspaceName>;" query
+    KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
+    if (keyspaceDefinition == null) {
+      List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();      
+      keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);      
+      this.cluster.addKeyspace(keyspaceDefinition);
+      LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
+      
+      keyspaceDefinition = null;
+    }
+    
+
+  }
+  
+  /**
+   * Drop keyspace.
+   */
+  public void dropKeyspace() {
+    // "drop keyspace <keyspaceName>;" query
+    this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
+  }
+
+  /**
+   * Insert a field in a column.
+   * @param key the row key
+   * @param fieldName the field name
+   * @param value the field value.
+   */
+  public void addColumn(String key, String fieldName, Object value) {
+    if (value == null) {
+      return;
+    }
+    if (value instanceof ByteBuffer) {
+      value = toString((ByteBuffer) value);
+    }
+    
+    String columnFamily = this.cassandraMapping.getFamily(fieldName);
+    String columnName = this.cassandraMapping.getColumn(fieldName);
+    
+    this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
+  }
+
+  /**
+   * TODO do no convert bytes to string to store a binary field
+   * @param value
+   * @return
+   */
+  private static String toString(ByteBuffer value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
+  }
+
+  /**
+   * Insert a member in a super column. This is used for map and record Avro types.
+   * @param key the row key
+   * @param fieldName the field name
+   * @param memberName the member name
+   * @param value the member value
+   */
+  public void addSubColumn(String key, String fieldName, String memberName, Object value) {
+    if (value == null) {
+      return;
+    }
+    
+    if (value instanceof ByteBuffer) {
+      value = toString((ByteBuffer) value);
+    }
+    
+    String columnFamily = this.cassandraMapping.getFamily(fieldName);
+    String superColumnName = this.cassandraMapping.getColumn(fieldName);
+    
+    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
+    
+  }
+  
+  /**
+   * Select a family column in the keyspace.
+   * @param cassandraQuery a wrapper of the query
+   * @param family the family name to be queried
+   * @return a list of family rows
+   */
+  public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
+    
+    String[] columnNames = cassandraQuery.getColumns(family);
+    Query<K, T> query = cassandraQuery.getQuery();
+    int limit = (int) query.getLimit();
+    String startKey = (String) query.getStartKey();
+    String endKey = (String) query.getEndKey();
+    
+    if (startKey == null) {
+      startKey = "";
+    }
+    if (endKey == null) {
+      endKey = "";
+    }
+    
+    
+    RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
+    rangeSlicesQuery.setColumnFamily(family);
+    rangeSlicesQuery.setKeys(startKey, endKey);
+    rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
+    rangeSlicesQuery.setRowCount(limit);
+    rangeSlicesQuery.setColumnNames(columnNames);
+    
+
+    QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
+    OrderedRows<String, String, String> orderedRows = queryResult.get();
+    
+    
+    return orderedRows.getList();
+  }
+
+  /**
+   * Select the families that contain at least one column mapped to a query field.
+   * @param query indicates the columns to select
+   * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
+   */
+  public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
+    Map<String, List<String>> map = new HashMap<String, List<String>>();
+    for (String field: query.getFields()) {
+      String family = this.cassandraMapping.getFamily(field);
+      String column = this.cassandraMapping.getColumn(field);
+      
+      // check if the family value was already initialized 
+      List<String> list = map.get(family);
+      if (list == null) {
+        list = new ArrayList<String>();
+        map.put(family, list);
+      }
+      
+      if (column != null) {
+        list.add(column);
+      }
+      
+    }
+    
+    return map;
+  }
+  
+  /**
+   * Select the field names according to the column names, which format if fully qualified: "family:column"
+   * @param query
+   * @return a map which keys are the fully qualified column names and values the query fields
+   */
+  public Map<String, String> getReverseMap(Query<K, T> query) {
+    Map<String, String> map = new HashMap<String, String>();
+    for (String field: query.getFields()) {
+      String family = this.cassandraMapping.getFamily(field);
+      String column = this.cassandraMapping.getColumn(field);
+      
+      map.put(family + ":" + column, field);
+    }
+    
+    return map;
+     
+  }
+
+  public boolean isSuper(String family) {
+    return this.cassandraMapping.isSuper(family);
+  }
+
+  public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
+    String[] columnNames = cassandraQuery.getColumns(family);
+    Query<K, T> query = cassandraQuery.getQuery();
+    int limit = (int) query.getLimit();
+    String startKey = (String) query.getStartKey();
+    String endKey = (String) query.getEndKey();
+    
+    if (startKey == null) {
+      startKey = "";
+    }
+    if (endKey == null) {
+      endKey = "";
+    }
+    
+    
+    RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
+    rangeSuperSlicesQuery.setColumnFamily(family);    
+    rangeSuperSlicesQuery.setKeys(startKey, endKey);
+    rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
+    rangeSuperSlicesQuery.setRowCount(limit);
+    rangeSuperSlicesQuery.setColumnNames(columnNames);
+    
+    
+    QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
+    OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
+    return orderedRows.getList();
+
+
+  }
+}

Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
@@ -1,50 +1,155 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.gora.cassandra.store;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
+import me.prettyprint.cassandra.service.ThriftCfDef;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.ColumnType;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CassandraMapping {
+  
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
+  
+  private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
+  private static final String KEYSPACE_ELEMENT = "keyspace";
+  private static final String NAME_ATTRIBUTE = "name";
+  private static final String MAPPING_ELEMENT = "class";
+  private static final String COLUMN_ATTRIBUTE = "qualifier";
+  private static final String FAMILY_ATTRIBUTE = "family";
+  private static final String SUPER_ATTRIBUTE = "type";
+  private static final String CLUSTER_ATTRIBUTE = "cluster";
+  private static final String HOST_ATTRIBUTE = "host";
+
+
+  private String hostName;
+  private String clusterName;
+  private String keyspaceName;
+  
+  
+  /**
+   * List of the super column families.
+   */
+  private List<String> superFamilies = new ArrayList<String>();
+
+  /**
+   * Look up the column family associated to the Avro field.
+   */
+  private Map<String, String> familyMap = new HashMap<String, String>();
+  
+  /**
+   * Look up the column associated to the Avro field.
+   */
+  private Map<String, String> columnMap = new HashMap<String, String>();
+
+  /**
+   * Look up the column family from its name.
+   */
+  private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
 
-  private String keySpace;
+  public String getHostName() {
+    return this.hostName;
+  }
 
-  private Map<String, Boolean> families =
-    new HashMap<String, Boolean>();
+  public String getClusterName() {
+    return this.clusterName;
+  }
 
-  public String getKeySpace() {
-    return keySpace;
+  public String getKeyspaceName() {
+    return this.keyspaceName;
   }
 
-  public void setKeySpace(String keySpace) {
-    this.keySpace = keySpace;
+
+  @SuppressWarnings("unchecked")
+  public void loadConfiguration() throws JDOMException, IOException {
+    SAXBuilder saxBuilder = new SAXBuilder();
+    Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
+    Element root = document.getRootElement();
+    
+    Element keyspace = root.getChild(KEYSPACE_ELEMENT);
+    this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
+    this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
+    this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
+    
+    // load column family definitions
+    List<Element> elements = keyspace.getChildren();
+    for (Element element: elements) {
+      BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
+      
+      String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
+      
+      String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
+      if (superAttribute != null) {
+        this.superFamilies.add(familyName);
+        cfDef.setColumnType(ColumnType.SUPER);
+        cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
+      }
+      
+      cfDef.setKeyspaceName(this.keyspaceName);
+      cfDef.setName(familyName);
+      cfDef.setComparatorType(ComparatorType.UTF8TYPE);
+      cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
+      
+      this.columnFamilyDefinitions.put(familyName, cfDef);
+
+    }
+    
+    // load column definitions    
+    Element mapping = root.getChild(MAPPING_ELEMENT);
+    elements = mapping.getChildren();
+    for (Element element: elements) {
+      String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
+      String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
+      String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
+      BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
+      if (columnFamilyDefinition == null) {
+        LOG.warn("Family " + familyName + " was not declared in the keyspace.");
+      }
+      
+      this.familyMap.put(fieldName, familyName);
+      this.columnMap.put(fieldName, columnName);
+      
+    }    
   }
 
-  public Set<String> getColumnFamilies() {
-    return families.keySet();
+  public String getFamily(String name) {
+    return this.familyMap.get(name);
   }
 
-  public void addColumnFamily(String columnFamily, boolean isSuper) {
-    families.put(columnFamily, isSuper);
+  public String getColumn(String name) {
+    return this.columnMap.get(name);
   }
 
-  public boolean isColumnFamilySuper(String columnFamily) {
-    return families.get(columnFamily);
+  /**
+   * Read family super attribute.
+   * @param family the family name
+   * @return true is the family is a super column family
+   */
+  public boolean isSuper(String family) {
+    return this.superFamilies.indexOf(family) != -1;
   }
+
+  public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
+    List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
+    for (String key: this.columnFamilyDefinitions.keySet()) {
+      ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
+      ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
+      list.add(thriftCfDef);
+    }
+    
+    return list;
+  }
+
 }

Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
@@ -1,465 +1,334 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.gora.cassandra.store;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
+
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.SuperRow;
+import me.prettyprint.hector.api.beans.SuperSlice;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.gora.cassandra.client.CassandraClient;
-import org.apache.gora.cassandra.client.Mutate;
-import org.apache.gora.cassandra.client.Row;
-import org.apache.gora.cassandra.client.Select;
-import org.apache.gora.cassandra.client.SimpleCassandraClient;
-import org.apache.gora.cassandra.query.CassandraPartitionQuery;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraResult;
-import org.apache.gora.persistency.ListGenericArray;
+import org.apache.gora.cassandra.query.CassandraResultSet;
+import org.apache.gora.cassandra.query.CassandraRow;
+import org.apache.gora.cassandra.query.CassandraSubColumn;
+import org.apache.gora.cassandra.query.CassandraSuperColumn;
 import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
 import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
-import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.query.impl.PartitionQueryImpl;
 import org.apache.gora.store.impl.DataStoreBase;
-import org.apache.gora.util.ByteUtils;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.input.SAXBuilder;
-
-/**
- * DataStore for Cassandra.
- *
- * <p> Note: CassandraStore is not thread-safe. </p>
- */
-public class CassandraStore<K, T extends Persistent>
-extends DataStoreBase<K, T> {
-
-  private static final String ERROR_MESSAGE =
-    "Cassandra does not support creating or modifying ColumnFamilies during runtime";
-
-  private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
-
-  private static final int SPLIT_SIZE = 65536;
-
-  private static final int BATCH_COUNT = 256;
-
-  private CassandraClient client;
-
-  private Map<String, CassandraColumn> columnMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-  private CassandraMapping mapping;
-
-  @Override
-  public void initialize(Class<K> keyClass, Class<T> persistentClass,
-      Properties properties) throws IOException {
-    super.initialize(keyClass, persistentClass, properties);
-
-    String mappingFile =
-      DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
-
-    readMapping(mappingFile);
+public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
+  public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
+  
+  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
+
+  /**
+   * The values are Avro fields pending to be stored.
+   */
+  private Map<K, T> buffer = new HashMap<K, T>();
+  
+  public CassandraStore() throws Exception {
+    this.cassandraClient.init();
   }
 
   @Override
-  public String getSchemaName() {
-    return mapping.getKeySpace();
+  public void close() throws IOException {
+    LOG.debug("close");
+    flush();
   }
 
   @Override
-  public void createSchema() throws IOException {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  public void createSchema() {
+    LOG.debug("create schema");
+    this.cassandraClient.checkKeyspace();
   }
 
   @Override
-  public void deleteSchema() throws IOException {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  public boolean delete(K key) throws IOException {
+    LOG.debug("delete " + key);
+    return false;
   }
 
   @Override
-  public boolean schemaExists() throws IOException {
-    return true;
+  public long deleteByQuery(Query<K, T> query) throws IOException {
+    LOG.debug("delete by query " + query);
+    return 0;
   }
 
-  public CassandraClient getClientByLocation(String endPoint) {
-    return client;
+  @Override
+  public void deleteSchema() throws IOException {
+    LOG.debug("delete schema");
+    this.cassandraClient.dropKeyspace();
   }
 
-  public Select createSelect(String[] fields) {
-    Select select = new Select();
-    if (fields == null) {
-      fields = beanFactory.getCachedPersistent().getFields();
-    }
-    for (String f : fields) {
-      CassandraColumn col = columnMap.get(f);
-      Schema fieldSchema = fieldMap.get(f).schema();
-      switch (fieldSchema.getType()) {
-        case MAP:
-        case ARRAY:
-          if (col.isSuperColumn()) {
-            select.addAllColumnsForSuperColumn(col.family, col.superColumn);
-          } else {
-            select.addColumnAll(col.family);
-          }
-          break;
-        default:
-          if (col.isSuperColumn()) {
-            select.addColumnName(col.family, col.superColumn, col.column);
-          } else {
-            select.addColumnName(col.family, col.column);
-          }
-          break;
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws IOException {
+    
+    Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
+    Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
+    
+    CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
+    cassandraQuery.setQuery(query);
+    cassandraQuery.setFamilyMap(familyMap);
+    
+    CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
+    cassandraResult.setReverseMap(reverseMap);
+
+    CassandraResultSet cassandraResultSet = new CassandraResultSet();
+    
+    // We query Cassandra keyspace by families.
+    for (String family : familyMap.keySet()) {
+      if (this.cassandraClient.isSuper(family)) {
+        addSuperColumns(family, cassandraQuery, cassandraResultSet);
+         
+      } else {
+        addSubColumns(family, cassandraQuery, cassandraResultSet);
+      
       }
+      
     }
-    return select;
-  }
+    
+    cassandraResult.setResultSet(cassandraResultSet);
+    
+    
+    return cassandraResult;
 
-  @Override
-  public T get(K key, String[] fields) throws IOException {
-    if (fields == null) {
-      fields = beanFactory.getCachedPersistent().getFields();
-    }
-    Select select = createSelect(fields);
-    try {
-      Row result = client.get(key.toString(), select);
-      return newInstance(result, fields);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
   }
 
-  @SuppressWarnings("rawtypes")
-  private void setField(T persistent, Field field, StatefulMap map) {
-    persistent.put(field.pos(), map);
-  }
-
-  private void setField(T persistent, Field field, byte[] val)
-  throws IOException {
-    persistent.put(field.pos()
-        , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
-  }
-
-  @SuppressWarnings("rawtypes")
-  private void setField(T persistent, Field field, GenericArray list) {
-    persistent.put(field.pos(), list);
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public T newInstance(Row result, String[] fields)
-  throws IOException {
-    if(result == null)
-      return null;
-
-    T persistent = newPersistent();
-    StateManager stateManager = persistent.getStateManager();
-    for (String f : fields) {
-      CassandraColumn col = columnMap.get(f);
-      Field field = fieldMap.get(f);
-      Schema fieldSchema = field.schema();
-      Map<String, byte[]> qualMap;
-      switch(fieldSchema.getType()) {
-        case MAP:
-          if (col.isSuperColumn()) {
-            qualMap = result.getSuperColumn(col.family, col.superColumn);
-          } else {
-            qualMap = result.getColumn(col.family);
-          }
-          if (qualMap == null) {
-            continue;
-          }
-          Schema valueSchema = fieldSchema.getValueType();
-          StatefulMap map = new StatefulHashMap();
-          for (Entry<String, byte[]> e : qualMap.entrySet()) {
-            Utf8 mapKey = new Utf8(e.getKey());
-            map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
-            map.putState(mapKey, State.CLEAN);
-          }
-          setField(persistent, field, map);
-          break;
-        case ARRAY:
-          if (col.isSuperColumn()) {
-            qualMap = result.getSuperColumn(col.family, col.superColumn);
-          } else {
-            qualMap = result.getColumn(col.family);
-          }
-          if (qualMap == null) {
-            continue;
-          }
-          valueSchema = fieldSchema.getElementType();
-          ArrayList arrayList = new ArrayList();
-          for (Entry<String, byte[]> e : qualMap.entrySet()) {
-            arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
-          }
-          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
-          setField(persistent, field, arr);
-          break;
-        default:
-          byte[] val;
-          if (col.isSuperColumn()) {
-            val = result.get(col.family, col.superColumn, col.column);
-          } else {
-            val = result.get(col.family, col.column);
-          }
-          if (val == null) {
-            continue;
-          }
-          setField(persistent, field, val);
-          break;
+  private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
+      CassandraResultSet cassandraResultSet) {
+    // select family columns that are included in the query
+    List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
+    
+    for (Row<String, String, String> row : rows) {
+      String key = row.getKey();
+      
+      // find associated row in the resultset
+      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+      if (cassandraRow == null) {
+        cassandraRow = new CassandraRow();
+        cassandraResultSet.putRow(key, cassandraRow);
+        cassandraRow.setKey(key);
       }
+      
+      ColumnSlice<String, String> columnSlice = row.getColumnSlice();
+      
+      for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
+        CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
+        cassandraSubColumn.setValue(hColumn);
+        cassandraSubColumn.setFamily(family);
+        cassandraRow.add(cassandraSubColumn);
+      }
+      
     }
-    stateManager.clearDirty(persistent);
-    return persistent;
   }
 
-  @Override
-  public void put(K key, T obj) throws IOException {
-    Mutate mutate = new Mutate();
-    Schema schema = obj.getSchema();
-    StateManager stateManager = obj.getStateManager();
-    List<Field> fields = schema.getFields();
-    String qual;
-    byte[] value;
-    for (int i = 0; i < fields.size(); i++) {
-      if (!stateManager.isDirty(obj, i)) {
-        continue;
+  private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery, 
+      CassandraResultSet cassandraResultSet) {
+    
+    List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
+    for (SuperRow<String, String, String, String> superRow: superRows) {
+      String key = superRow.getKey();
+      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+      if (cassandraRow == null) {
+        cassandraRow = new CassandraRow();
+        cassandraResultSet.putRow(key, cassandraRow);
+        cassandraRow.setKey(key);
       }
-      Field field = fields.get(i);
-      Type type = field.schema().getType();
-      Object o = obj.get(i);
-      CassandraColumn col = columnMap.get(field.name());
-
-      switch(type) {
-      case MAP:
-        if(o instanceof StatefulMap) {
-          @SuppressWarnings("unchecked")
-          StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
-          for (Entry<Utf8, State> e : map.states().entrySet()) {
-            Utf8 mapKey = e.getKey();
-            switch (e.getValue()) {
-            case DIRTY:
-              qual = mapKey.toString();
-              value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
-              if (col.isSuperColumn()) {
-                mutate.put(col.family, col.superColumn, qual, value);
-              } else {
-                mutate.put(col.family, qual, value);
-              }
-              break;
-            case DELETED:
-              qual = mapKey.toString();
-              if (col.isSuperColumn()) {
-                mutate.delete(col.family, col.superColumn, qual);
-              } else {
-                mutate.delete(col.family, qual);
-              }
-              break;
-            }
-          }
-        } else {
-          @SuppressWarnings({ "rawtypes", "unchecked" })
-          Set<Map.Entry> set = ((Map)o).entrySet();
-          for(@SuppressWarnings("rawtypes") Entry entry: set) {
-            qual = entry.getKey().toString();
-            value = ByteUtils.toBytes(entry.getValue().toString());
-            if (col.isSuperColumn()) {
-              mutate.put(col.family, col.superColumn, qual, value);
-            } else {
-              mutate.put(col.family, qual, value);
-            }
-          }
-        }
-        break;
-      case ARRAY:
-        if(o instanceof GenericArray) {
-          @SuppressWarnings("rawtypes")
-          GenericArray arr = (GenericArray) o;
-          int j=0;
-          for(Object item : arr) {
-            value = ByteUtils.toBytes(item.toString());
-            if (col.isSuperColumn()) {
-              mutate.put(col.family, col.superColumn, Integer.toString(j), value);
-            } else {
-              mutate.put(col.family, Integer.toString(j), value);
-            }
-            j++;
-          }
-        }
-        break;
-      default:
-        value = ByteUtils.toBytes(o, field.schema(), datumWriter);
-        if (col.isSuperColumn()) {
-          mutate.put(col.family, col.superColumn, col.column, value);
-        } else {
-          mutate.put(col.family, col.column, value);
-        }
-        break;
+      
+      SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
+      for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
+        CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
+        cassandraSuperColumn.setValue(hSuperColumn);
+        cassandraSuperColumn.setFamily(family);
+        cassandraRow.add(cassandraSuperColumn);
       }
     }
-
-    if(!mutate.isEmpty())
-      client.mutate(key.toString(), mutate);
   }
 
-  @Override
-  public boolean delete(K key) throws IOException {
-    Mutate mutate = new Mutate();
-    for (String family : mapping.getColumnFamilies()) {
-      mutate.deleteAll(family);
+  /**
+   * Flush the buffer. Write the buffered rows.
+   * @see org.apache.gora.store.DataStore#flush()
+   */
+  @Override
+  public void flush() throws IOException {
+    for (K key: this.buffer.keySet()) {
+      T value = this.buffer.get(key);
+      Schema schema = value.getSchema();
+      for (Field field: schema.getFields()) {
+        if (value.isDirty(field.pos())) {
+          addOrUpdateField((String) key, field, value.get(field.pos()));
+        }
+      }
     }
-
-    client.mutate(key.toString(), mutate);
-    return true;
+    
+    this.buffer.clear();
   }
 
   @Override
-  public void flush() throws IOException { }
-
-  @Override
-  public void close() throws IOException {
-    client.close();
+  public T get(K key, String[] fields) throws IOException {
+    LOG.info("get " + key);
+    return null;
   }
 
   @Override
-  public Query<K, T> newQuery() {
-    return new CassandraQuery<K, T>(this);
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+      throws IOException {
+    // just a single partition
+    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
+    partitions.add(new PartitionQueryImpl<K,T>(query));
+    return partitions;
   }
 
   @Override
-  public long deleteByQuery(Query<K, T> query) throws IOException {
-    // TODO Auto-generated method stub
-    return 0;
+  public String getSchemaName() {
+    LOG.info("get schema name");
+    return null;
   }
 
   @Override
-  public Result<K, T> execute(Query<K, T> query) throws IOException {
-    return new CassandraResult<K, T>(this, query, BATCH_COUNT);
+  public Query<K, T> newQuery() {
+    return new CassandraQuery<K, T>(this);
   }
 
-  @Override
-  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
-  throws IOException {
-    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
-
-    List<TokenRange> rangeList = client.describeRing();
-    for (TokenRange range : rangeList) {
-      List<String> tokens =
-        client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
-      // turn the sub-ranges into InputSplits
-      String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
-      // hadoop needs hostname, not ip
-      for (int i = 0; i < endpoints.length; i++) {
-          endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
-      }
-
-      for (int i = 1; i < tokens.size(); i++) {
-        CassandraPartitionQuery<K, T> partitionQuery =
-          new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
-        partitions.add(partitionQuery);
+  /**
+   * Duplicate instance to keep all the objects in memory till flushing.
+   * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
+   */
+  @Override
+  public void put(K key, T value) throws IOException {
+    T p = (T) value.newInstance(new StateManagerImpl());
+    Schema schema = value.getSchema();
+    for (Field field: schema.getFields()) {
+      if (value.isDirty(field.pos())) {
+        Object fieldValue = value.get(field.pos());
+        
+        // check if field has a nested structure (map or record)
+        Schema fieldSchema = field.schema();
+        Type type = fieldSchema.getType();
+        switch(type) {
+          case RECORD:
+            Persistent persistent = (Persistent) fieldValue;
+            Persistent newRecord = persistent.newInstance(new StateManagerImpl());
+            for (Field member: fieldSchema.getFields()) {
+              newRecord.put(member.pos(), persistent.get(member.pos()));
+            }
+            fieldValue = newRecord;
+            break;
+          case MAP:
+            StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
+            StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
+            fieldValue = newMap;
+            break;
+        }
+        
+        p.put(field.pos(), fieldValue);
       }
     }
-    return partitions;
-  }
-
-  private CassandraClient createClient() throws IOException {
-    String serverStr =
-      DataStoreFactory.findPropertyOrDie(properties, this, "servers");
-    String[] server1Parts = serverStr.split(",")[0].split(":");
-    try {
-      return new SimpleCassandraClient(server1Parts[0],
-          Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  protected void readMapping(String filename) throws IOException {
-
-    mapping = new CassandraMapping();
-    columnMap = new HashMap<String, CassandraColumn>();
-
-    try {
-      SAXBuilder builder = new SAXBuilder();
-      Document doc = builder.build(getClass().getClassLoader()
-          .getResourceAsStream(filename));
-
-      List<Element> classes = doc.getRootElement().getChildren("class");
-
-      for(Element classElement: classes) {
-        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
-            && classElement.getAttributeValue("name").equals(
-                persistentClass.getCanonicalName())) {
-
-          String keySpace = classElement.getAttributeValue("keyspace");
-          mapping.setKeySpace(keySpace);
-          client = createClient();
-          Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
-          for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
-            boolean isSuper = e.getValue().get("Type").equals("Super");
-            mapping.addColumnFamily(e.getKey(), isSuper);
-          }
-
-          List<Element> fields = classElement.getChildren("field");
-
-          for(Element field:fields) {
-            String fieldName = field.getAttributeValue("name");
-            String path = field.getAttributeValue("path");
-            String[] parts = path.split(":");
-            String columnFamily = parts[0];
-            String superColumn = null;
-            String column = null;
-
-            boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
-            if (isSuper) {
-              superColumn = parts[1];
-              if (parts.length == 3) {
-                column = parts[2];
+    
+    this.buffer.put(key, p);
+ }
+
+  /**
+   * Add a field to Cassandra according to its type.
+   * @param key     the key of the row where the field should be added
+   * @param field   the Avro field representing a datum
+   * @param value   the field value
+   */
+  private void addOrUpdateField(String key, Field field, Object value) {
+    Schema schema = field.schema();
+    Type type = schema.getType();
+    //LOG.info(field.name() + " " + type.name());
+    switch (type) {
+      case STRING:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case INT:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case LONG:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case BYTES:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case FLOAT:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case RECORD:
+        if (value != null) {
+          if (value instanceof PersistentBase) {
+            PersistentBase persistentBase = (PersistentBase) value;
+            for (Field member: schema.getFields()) {
+              
+              // TODO: hack, do not store empty arrays
+              Object memberValue = persistentBase.get(member.pos());
+              if (memberValue instanceof GenericArray<?>) {
+                GenericArray<String> array = (GenericArray<String>) memberValue;
+                if (array.size() == 0) {
+                  continue;
+                }
               }
-            } else {
-              if (parts.length == 2) {
-                column = parts[1];
+              
+              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
+            }
+          } else {
+            LOG.info("Record not supported: " + value.toString());
+            
+          }
+        }
+        break;
+      case MAP:
+        if (value != null) {
+          if (value instanceof StatefulHashMap<?, ?>) {
+            //TODO cast to stateful map and only write dirty keys
+            Map<Utf8, Object> map = (Map<Utf8, Object>) value;
+            for (Utf8 mapKey: map.keySet()) {
+              
+              // TODO: hack, do not store empty arrays
+              Object keyValue = map.get(mapKey);
+              if (keyValue instanceof GenericArray<?>) {
+                GenericArray<String> array = (GenericArray<String>) keyValue;
+                if (array.size() == 0) {
+                  continue;
+                }
               }
+              
+              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);              
             }
-
-            columnMap.put(fieldName,
-                new CassandraColumn(columnFamily, superColumn, column));
+          } else {
+            LOG.info("Map not supported: " + value.toString());
           }
-
-          break;
         }
-      }
-    } catch(Exception ex) {
-      throw new IOException(ex);
+        break;
+      default:
+        LOG.info("Type not considered: " + type.name());      
     }
   }
-}
\ No newline at end of file
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    LOG.info("schema exists");
+    return false;
+  }
+
+}

Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
 public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
   public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
 
-  private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
-  private static final int BUFFER_LIMIT_READ_VALUE = 10000;
+  public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
+  public static final int BUFFER_LIMIT_READ_VALUE = 10000;
 
   protected Query<K,T> query;
   protected Result<K,T> result;