You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/01/29 17:24:00 UTC

svn commit: r904543 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/avro: ./ AvroValidation.java CassandraDaemon.java CassandraServer.java KeyspaceNotDefinedException.java RecordFactory.java

Author: eevans
Date: Fri Jan 29 16:24:00 2010
New Revision: 904543

URL: http://svn.apache.org/viewvc?rev=904543&view=rev
Log:
stubbed out a few rpc methods

- get
- insert
- batch_insert
- get_api_version

Patch by eevans

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/
    incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/KeyspaceNotDefinedException.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/RecordFactory.java

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=904543&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Fri Jan 29 16:24:00 2010
@@ -0,0 +1,121 @@
+package org.apache.cassandra.avro;
+
+import java.util.Arrays;
+import org.apache.avro.util.Utf8;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MarshalException;
+
+import static org.apache.cassandra.avro.ErrorFactory.newInvalidRequestException;
+import static org.apache.cassandra.avro.RecordFactory.newColumnPath;
+
+/**
+ * The Avro analogue to org.apache.cassandra.service.ThriftValidation
+ */
+public class AvroValidation {
+    // FIXME: could use method in ThriftValidation
+    static void validateKey(String key) throws InvalidRequestException
+    {
+        if (key.isEmpty())
+            throw newInvalidRequestException("Key may not be empty");
+    }
+    
+    // FIXME: could use method in ThriftValidation
+    static void validateKeyspace(String keyspace) throws KeyspaceNotDefinedException
+    {
+        if (!DatabaseDescriptor.getTables().contains(keyspace))
+            throw new KeyspaceNotDefinedException(new Utf8("Keyspace " + keyspace + " does not exist in this schema."));
+    }
+    
+    // FIXME: could use method in ThriftValidation
+    static String validateColumnFamily(String keyspace, String columnFamily) throws InvalidRequestException
+    {
+        if (columnFamily.isEmpty())
+            throw newInvalidRequestException("non-empty columnfamily is required");
+        
+        String cfType = DatabaseDescriptor.getColumnFamilyType(keyspace, columnFamily);
+        if (cfType == null)
+            throw newInvalidRequestException("unconfigured columnfamily " + columnFamily);
+        
+        return cfType;
+    }
+    
+    static void validateColumnPath(String keyspace, ColumnPath cp) throws InvalidRequestException
+    {
+        validateKeyspace(keyspace);
+        String column_family = cp.column_family.toString();
+        String cfType = validateColumnFamily(keyspace, column_family);
+        
+        byte[] column = null, super_column = null;
+        if (cp.super_column != null) super_column = cp.super_column.array();
+        if (cp.column != null) column = cp.column.array();
+        
+        if (cfType.equals("Standard"))
+        {
+            if (super_column != null)
+                throw newInvalidRequestException("supercolumn parameter is invalid for standard CF " + column_family);
+            
+            if (column == null)
+                throw newInvalidRequestException("column parameter is not optional for standard CF " + column_family);
+        }
+        else
+        {
+            if (super_column == null)
+                throw newInvalidRequestException("supercolumn parameter is not optional for super CF " + column_family);
+        }
+         
+        if (column != null)
+            validateColumns(keyspace, column_family, super_column, Arrays.asList(column));
+        if (super_column != null)
+            validateColumns(keyspace, column_family, null, Arrays.asList(super_column));
+    }
+    
+    // FIXME: could use method in ThriftValidation
+    static void validateColumns(String keyspace, String cfName, byte[] superColumnName, Iterable<byte[]> columnNames)
+    throws InvalidRequestException
+    {
+        if (superColumnName != null)
+        {
+            if (superColumnName.length > IColumn.MAX_NAME_LENGTH)
+                throw newInvalidRequestException("supercolumn name length must not be greater than " + IColumn.MAX_NAME_LENGTH);
+            if (superColumnName.length == 0)
+                throw newInvalidRequestException("supercolumn name must not be empty");
+            if (!DatabaseDescriptor.getColumnFamilyType(keyspace, cfName).equals("Super"))
+                throw newInvalidRequestException("supercolumn specified to ColumnFamily " + cfName + " containing normal columns");
+        }
+        
+        AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, cfName, superColumnName);
+        for (byte[] name : columnNames)
+        {
+            if (name.length > IColumn.MAX_NAME_LENGTH)
+                throw newInvalidRequestException("column name length must not be greater than " + IColumn.MAX_NAME_LENGTH);
+            if (name.length == 0)
+                throw newInvalidRequestException("column name must not be empty");
+            
+            try
+            {
+                comparator.validate(name);
+            }
+            catch (MarshalException e)
+            {
+                throw newInvalidRequestException(e.getMessage());
+            }
+        }
+    }
+
+    static void validateColumnOrSuperColumn(String keyspace, String cfName, ColumnOrSuperColumn cosc)
+    throws InvalidRequestException
+    {
+        if (cosc.column != null)
+            AvroValidation.validateColumnPath(keyspace, newColumnPath(cfName, cosc.super_column.name, cosc.column.name));
+
+        if (cosc.super_column != null)
+            for (Column c : cosc.super_column.columns)
+                AvroValidation.validateColumnPath(keyspace, newColumnPath(cfName, cosc.super_column.name, c.name));
+
+        if ((cosc.column == null) && (cosc.super_column == null))
+            throw newInvalidRequestException("ColumnOrSuperColumn must have one or both of Column or SuperColumn");
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=904543&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Fri Jan 29 16:24:00 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.RecoveryManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+
+/**
+ * The Avro analogue to org.apache.cassandra.service.CassandraDaemon.
+ *
+ */
+public class CassandraDaemon {
+    private static Logger logger = Logger.getLogger(CassandraDaemon.class);
+    private SocketServer server;
+    private InetAddress listenAddr;
+    private int listenPort;
+    
+    private void setup() throws IOException
+    {
+        // log4j
+        String file = System.getProperty("storage-config") + File.separator + "log4j.properties";
+        PropertyConfigurator.configure(file);
+
+        listenPort = DatabaseDescriptor.getThriftPort();
+        listenAddr = DatabaseDescriptor.getThriftAddress();
+        
+        /* 
+         * If ThriftAddress was left completely unconfigured, then assume
+         * the same default as ListenAddress
+         */
+        if (listenAddr == null)
+            listenAddr = FBUtilities.getLocalAddress();
+        
+        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+        {
+            public void uncaughtException(Thread t, Throwable e)
+            {
+                logger.error("Fatal exception in thread " + t, e);
+                if (e instanceof OutOfMemoryError)
+                {
+                    System.exit(100);
+                }
+            }
+        });
+
+        // initialize keyspaces
+        for (String table : Table.getAllTableNames())
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("opening keyspace " + table);
+            Table.open(table);
+        }
+
+        // replay the log if necessary and check for compaction candidates
+        RecoveryManager.doRecovery();
+        CompactionManager.instance.checkAllColumnFamilies();
+
+        // start server internals
+        StorageService.instance.initServer();
+
+    }
+    
+    /** hook for JSVC */
+    public void load(String[] arguments) throws IOException
+    {
+        setup();
+    }
+    
+    /** hook for JSVC */
+    public void start() throws IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("Binding avro service to %s:%s", listenAddr, listenPort));
+        InetSocketAddress socketAddress = new InetSocketAddress(listenAddr, listenPort);
+        SpecificResponder responder = new SpecificResponder(Cassandra.class, new CassandraServer());
+        
+        logger.info("Cassandra starting up...");
+        server = new SocketServer(responder, socketAddress);
+    }
+    
+    /** hook for JSVC */
+    public void stop()
+    {
+        logger.info("Cassandra shutting down...");
+        server.close();
+    }
+    
+    /** hook for JSVC */
+    public void destroy()
+    {
+    }
+    
+    public static void main(String[] args) {
+        CassandraDaemon daemon = new CassandraDaemon();
+        String pidFile = System.getProperty("cassandra-pidfile");
+        
+        try
+        {   
+            daemon.setup();
+
+            if (pidFile != null)
+            {
+                new File(pidFile).deleteOnExit();
+            }
+
+            if (System.getProperty("cassandra-foreground") == null)
+            {
+                System.out.close();
+                System.err.close();
+            }
+
+            daemon.start();
+        }
+        catch (Throwable e)
+        {
+            String msg = "Exception encountered during startup.";
+            logger.error(msg, e);
+
+            // try to warn user on stdout too, if we haven't already detached
+            System.out.println(msg);
+            e.printStackTrace();
+
+            System.exit(3);
+        }
+    }
+
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=904543&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri Jan 29 16:24:00 2010
@@ -0,0 +1,326 @@
+package org.apache.cassandra.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.log4j.Logger;
+import static org.apache.cassandra.avro.RecordFactory.*;
+import static org.apache.cassandra.avro.ErrorFactory.*;
+
+public class CassandraServer implements Cassandra {
+    private static Logger logger = Logger.getLogger(CassandraServer.class);
+
+    private final static GenericArray<Column> EMPTY_SUBCOLUMNS = new GenericData.Array<Column>(0, Schema.parse("{\"type\":\"array\",\"items\":" + Column.SCHEMA$ + "}"));
+    private final static Utf8 API_VERSION = new Utf8("0.0.0");
+
+    @Override
+    public ColumnOrSuperColumn get(Utf8 keyspace, Utf8 key, ColumnPath columnPath, ConsistencyLevel consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, NotFoundException, UnavailableException, TimedOutException {
+        if (logger.isDebugEnabled())
+            logger.debug("get");
+        
+        ColumnOrSuperColumn column = multigetInternal(keyspace.toString(), Arrays.asList(key.toString()), columnPath, consistencyLevel).get(key.toString());
+        
+        if ((column.column == null) && (column.super_column == null))
+        {
+            throw newNotFoundException("Path not found");
+        }
+        return column;
+    }
+
+    private Map<String, ColumnOrSuperColumn> multigetInternal(String keyspace, List<String> keys, ColumnPath cp, ConsistencyLevel level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        AvroValidation.validateColumnPath(keyspace, cp);
+        
+        // FIXME: This is repetitive.
+        byte[] column, super_column;
+        column = cp.column == null ? null : cp.column.array();
+        super_column = cp.super_column == null ? null : cp.super_column.array();
+        
+        QueryPath path = new QueryPath(cp.column_family.toString(), column == null ? null : super_column);
+        List<byte[]> nameAsList = Arrays.asList(column == null ? super_column : column);
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
+        for (String key: keys)
+        {
+            AvroValidation.validateKey(key);
+            commands.add(new SliceByNamesReadCommand(keyspace, key, path, nameAsList));
+        }
+        
+        Map<String, ColumnOrSuperColumn> columnFamiliesMap = new HashMap<String, ColumnOrSuperColumn>();
+        Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, level);
+        
+        for (ReadCommand command: commands)
+        {
+            ColumnOrSuperColumn columnorsupercolumn;
+
+            Collection<IColumn> columns = columnsMap.get(command.key);
+            if (columns == null)
+            {
+               columnorsupercolumn = new ColumnOrSuperColumn();
+            }
+            else
+            {
+                assert columns.size() == 1;
+                IColumn col = columns.iterator().next();
+
+
+                if (col.isMarkedForDelete())
+                {
+                    columnorsupercolumn = new ColumnOrSuperColumn();
+                }
+                else
+                {
+                    columnorsupercolumn = col instanceof org.apache.cassandra.db.Column
+                                          ? newColumnOrSuperColumn(newColumn(col.name(), col.value(), col.timestamp()))
+                                          : newColumnOrSuperColumn(newSuperColumn(col.name(), avronateSubColumns(col.getSubColumns())));
+                }
+
+            }
+            columnFamiliesMap.put(command.key, columnorsupercolumn);
+        }
+
+        return columnFamiliesMap;
+    }
+    
+    private Map<String, Collection<IColumn>> multigetColumns(List<ReadCommand> commands, ConsistencyLevel level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, level);
+        Map<String, Collection<IColumn>> columnFamiliesMap = new HashMap<String, Collection<IColumn>>();
+        
+        for (ReadCommand command: commands)
+        {
+            ColumnFamily cfamily = cfamilies.get(command.key);
+            if (cfamily == null)
+                continue;
+
+            Collection<IColumn> columns = null;
+            if (command.queryPath.superColumnName != null)
+            {
+                IColumn column = cfamily.getColumn(command.queryPath.superColumnName);
+                if (column != null)
+                {
+                    columns = column.getSubColumns();
+                }
+            }
+            else
+            {
+                columns = cfamily.getSortedColumns();
+            }
+
+            if (columns != null && columns.size() != 0)
+            {
+                columnFamiliesMap.put(command.key, columns);
+            }
+        }
+        
+        return columnFamiliesMap;
+    }
+    
+    protected Map<String, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        // TODO - Support multiple column families per row, right now row only contains 1 column family
+        Map<String, ColumnFamily> columnFamilyKeyMap = new HashMap<String,ColumnFamily>();
+        
+        if (consistency == ConsistencyLevel.ZERO)
+            throw newInvalidRequestException("Consistency level zero may not be applied to read operations");
+        
+        if (consistency == ConsistencyLevel.ALL)
+            throw newInvalidRequestException("Consistency level all is not yet supported on read operations");
+        
+        List<Row> rows;
+        try
+        {
+            rows = StorageProxy.readProtocol(commands, thriftConsistencyLevel(consistency));
+        }
+        catch (TimeoutException e) 
+        {
+            throw new TimedOutException();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        // FIXME: This suckage brought to you by StorageService and StorageProxy
+        // which throw Thrift exceptions directly.
+        catch (org.apache.cassandra.thrift.UnavailableException e)
+        {
+            throw new UnavailableException();
+        }
+
+        for (Row row: rows)
+        {
+            columnFamilyKeyMap.put(row.key, row.cf);
+        }
+        
+        return columnFamilyKeyMap;
+    }
+    
+    // Don't playa hate, avronate.
+    public GenericArray<Column> avronateSubColumns(Collection<IColumn> columns)
+    {
+        if (columns == null || columns.isEmpty())
+            return EMPTY_SUBCOLUMNS;
+        
+        GenericData.Array<Column> avroColumns = new GenericData.Array<Column>(columns.size(), Column.SCHEMA$);
+
+        for (IColumn column : columns)
+        {
+            if (column.isMarkedForDelete())
+                continue;
+            
+            Column avroColumn = newColumn(column.name(), column.value(), column.timestamp());
+            avroColumns.add(avroColumn);
+        }
+        
+        return avroColumns;
+    }
+
+    @Override
+    public Void insert(Utf8 keyspace, Utf8 key, ColumnPath cp, ByteBuffer value, long timestamp, ConsistencyLevel consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("insert");
+
+        // FIXME: This is repetitive.
+        byte[] column, super_column;
+        column = cp.column == null ? null : cp.column.array();
+        super_column = cp.super_column == null ? null : cp.super_column.array();
+        String column_family = cp.column_family.toString();
+        String keyspace_string = keyspace.toString();
+
+        AvroValidation.validateKey(keyspace_string);
+        AvroValidation.validateColumnPath(keyspace_string, cp);
+
+        RowMutation rm = new RowMutation(keyspace_string, key.toString());
+        try
+        {
+            rm.add(new QueryPath(column_family, super_column, column), value.array(), timestamp);
+        }
+        catch (MarshalException e)
+        {
+            throw newInvalidRequestException(e.getMessage());
+        }
+        doInsert(consistencyLevel, rm);
+
+        return null;
+    }
+
+    private void doInsert(ConsistencyLevel consistency, RowMutation rm) throws UnavailableException, TimedOutException
+    {
+        if (consistency != ConsistencyLevel.ZERO)
+        {
+            try
+            {
+                StorageProxy.mutateBlocking(Arrays.asList(rm), thriftConsistencyLevel(consistency));
+            }
+            catch (TimeoutException e)
+            {
+                throw new TimedOutException();
+            }
+            catch (org.apache.cassandra.thrift.UnavailableException thriftE)
+            {
+                throw new UnavailableException();
+            }
+        }
+        else
+        {
+            StorageProxy.mutate(Arrays.asList(rm));
+        }
+    }
+
+    @Override
+    public Void batch_insert(Utf8 keyspace, Utf8 key, Map<Utf8, GenericArray<ColumnOrSuperColumn>> cfmap, ConsistencyLevel consistency)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("batch_insert");
+
+        String keyString = key.toString();
+        String keyspaceString = keyspace.toString();
+
+        AvroValidation.validateKey(keyString);
+
+        for (Utf8 cfName : cfmap.keySet())
+        {
+            for (ColumnOrSuperColumn cosc : cfmap.get(cfName))
+                AvroValidation.validateColumnOrSuperColumn(keyspaceString, cfName.toString(), cosc);
+        }
+
+        doInsert(consistency, getRowMutation(keyspaceString, keyString, cfmap));
+        return null;
+    }
+
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, (RowMutation.getRowMutation uses Thrift types directly).
+    private static RowMutation getRowMutation(String keyspace, String key, Map<Utf8, GenericArray<ColumnOrSuperColumn>> cfmap)
+    {
+        RowMutation rm = new RowMutation(keyspace, key.trim());
+        for (Map.Entry<Utf8, GenericArray<ColumnOrSuperColumn>> entry : cfmap.entrySet())
+        {
+            String cfName = entry.getKey().toString();
+            for (ColumnOrSuperColumn cosc : entry.getValue())
+            {
+                if (cosc.column == null)
+                {
+                    assert cosc.super_column != null;
+                    for (Column column : cosc.super_column.columns)
+                    {
+                        QueryPath path = new QueryPath(cfName, cosc.super_column.name.array(), column.name.array());
+                        rm.add(path, column.value.array(), column.timestamp);
+                    }
+                }
+                else
+                {
+                    assert cosc.super_column == null;
+                    QueryPath path = new QueryPath(cfName, null, cosc.column.name.array());
+                    rm.add(path, cosc.column.value.array(), cosc.column.timestamp);
+                }
+            }
+        }
+        return rm;
+    }
+
+    private org.apache.cassandra.thrift.ConsistencyLevel thriftConsistencyLevel(ConsistencyLevel consistency)
+    {
+        switch (consistency)
+        {
+            case ZERO: return org.apache.cassandra.thrift.ConsistencyLevel.ZERO;
+            case ONE: return org.apache.cassandra.thrift.ConsistencyLevel.ONE;
+            case QUORUM: return org.apache.cassandra.thrift.ConsistencyLevel.QUORUM;
+            case DCQUORUM: return org.apache.cassandra.thrift.ConsistencyLevel.DCQUORUM;
+            case DCQUORUMSYNC: return org.apache.cassandra.thrift.ConsistencyLevel.DCQUORUMSYNC;
+            case ALL: return org.apache.cassandra.thrift.ConsistencyLevel.ALL;
+        }
+        return null;
+    }
+
+    @Override
+    public Utf8 get_api_version() throws AvroRemoteException
+    {
+        return API_VERSION;
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/KeyspaceNotDefinedException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/KeyspaceNotDefinedException.java?rev=904543&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/KeyspaceNotDefinedException.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/KeyspaceNotDefinedException.java Fri Jan 29 16:24:00 2010
@@ -0,0 +1,13 @@
+package org.apache.cassandra.avro;
+
+import org.apache.avro.util.Utf8;
+
+// XXX: This is an analogue to org.apache.cassandra.db.KeyspaceNotDefinedException
+@SuppressWarnings("serial")
+public class KeyspaceNotDefinedException extends InvalidRequestException {
+    
+    public KeyspaceNotDefinedException(Utf8 why)
+    {
+        this.why = why;
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/RecordFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/RecordFactory.java?rev=904543&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/RecordFactory.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/avro/RecordFactory.java Fri Jan 29 16:24:00 2010
@@ -0,0 +1,108 @@
+package org.apache.cassandra.avro;
+
+import java.nio.ByteBuffer;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+
+class RecordFactory
+{
+    static Column newColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    {
+        Column column = new Column();
+        column.name = name;
+        column.value = value;
+        column.timestamp = timestamp;
+        return column;
+    }
+    
+    static Column newColumn(byte[] name, byte[] value, long timestamp)
+    {
+        return newColumn(ByteBuffer.wrap(name), ByteBuffer.wrap(value), timestamp);
+    }
+    
+    static SuperColumn newSuperColumn(ByteBuffer name, GenericArray<Column> columns)
+    {
+        SuperColumn column = new SuperColumn();
+        column.name = name;
+        column.columns = columns;
+        return column;
+    }
+    
+    static SuperColumn newSuperColumn(byte[] name, GenericArray<Column> columns)
+    {
+        return newSuperColumn(ByteBuffer.wrap(name), columns);
+    }
+    
+    static ColumnOrSuperColumn newColumnOrSuperColumn(Column column)
+    {
+        ColumnOrSuperColumn col = new ColumnOrSuperColumn();
+        col.column = column;
+        return col;
+    }
+    
+    static ColumnOrSuperColumn newColumnOrSuperColumn(SuperColumn superColumn)
+    {
+        ColumnOrSuperColumn column = new ColumnOrSuperColumn();
+        column.super_column = superColumn;
+        return column;
+    }
+
+    static ColumnPath newColumnPath(String cfName, ByteBuffer superColumn, ByteBuffer column)
+    {
+        ColumnPath cPath = new ColumnPath();
+        cPath.super_column = superColumn;
+        cPath.column = column;
+        return cPath;
+    }
+}
+
+class ErrorFactory
+{
+    static InvalidRequestException newInvalidRequestException(Utf8 why)
+    {
+        InvalidRequestException exception = new InvalidRequestException();
+        exception.why = why;
+        return exception;
+    }
+    
+    static InvalidRequestException newInvalidRequestException(String why)
+    {
+        return newInvalidRequestException(new Utf8(why));
+    }
+    
+    static NotFoundException newNotFoundException(Utf8 why)
+    {
+        NotFoundException exception = new NotFoundException();
+        exception.why = why;
+        return exception;
+    }
+    
+    static NotFoundException newNotFoundException(String why)
+    {
+        return newNotFoundException(new Utf8(why));
+    }
+    
+    static TimedOutException newTimedOutException(Utf8 why)
+    {
+        TimedOutException exception = new TimedOutException();
+        exception.why = why;
+        return exception;
+    }
+    
+    static TimedOutException newTimedOutException(String why)
+    {
+        return newTimedOutException(new Utf8(why));
+    }
+    
+    static UnavailableException newUnavailableException(Utf8 why)
+    {
+        UnavailableException exception = new UnavailableException();
+        exception.why = why;
+        return exception;
+    }
+    
+    static UnavailableException newUnavailableException(String why)
+    {
+        return newUnavailableException(new Utf8(why));
+    }
+}
\ No newline at end of file