You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/26 23:01:36 UTC

svn commit: r979440 - in /cassandra/trunk: contrib/word_count/src/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/hadoop/

Author: jbellis
Date: Mon Jul 26 21:01:36 2010
New Revision: 979440

URL: http://svn.apache.org/viewvc?rev=979440&view=rev
Log:
finish CASSANDRA-1280 for trunk (lots of new code that the merge from 0.6 didn't finish).  bonus: contrib/word_count actually tested and working again.  patch by jbellis

Modified:
    cassandra/trunk/contrib/word_count/src/WordCount.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Modified: cassandra/trunk/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCount.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCount.java Mon Jul 26 21:01:36 2010
@@ -54,7 +54,6 @@ public class WordCount extends Configure
     static final String COLUMN_FAMILY = "Standard1";
     private static final String CONF_COLUMN_NAME = "columnname";
     private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
-    static final int RING_DELAY = 3000; // this is enough for testing a single server node; may need more for a real cluster
 
     public static void main(String[] args) throws Exception
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jul 26 21:01:36 2010
@@ -113,9 +113,8 @@ public class DatabaseDescriptor
     {
         try
         {
-            
             configFileName = getStorageConfigPath();
-            
+
             if (logger.isDebugEnabled())
                 logger.info("Loading settings from " + configFileName);
             
@@ -647,7 +646,6 @@ public class DatabaseDescriptor
 
     public static AbstractType getComparator(String compareWith) throws ConfigurationException
     {
-        logger.info(compareWith);
         Class<? extends AbstractType> typeClass;
         
         if (compareWith == null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jul 26 21:01:36 2010
@@ -41,7 +41,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 /**
@@ -170,18 +173,8 @@ public class ColumnFamilyInputFormat ext
     private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
     {
         // TODO handle failure of range replicas & retry
-        TSocket socket = new TSocket(range.endpoints.get(0), ConfigHelper.getThriftPort(conf));
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        Cassandra.Client client = createConnection(range.endpoints.get(0), ConfigHelper.getRpcPort(conf), true);
         int splitsize = ConfigHelper.getInputSplitSize(conf);
-        try
-        {
-            socket.open();
-        }
-        catch (TTransportException e)
-        {
-            throw new IOException(e);
-        }
         List<String> splits;
         try
         {
@@ -194,19 +187,25 @@ public class ColumnFamilyInputFormat ext
         return splits;
     }
 
-    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+    private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
     {
-        TSocket socket = new TSocket(ConfigHelper.getInitialAddress(conf), ConfigHelper.getThriftPort(conf));
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        TSocket socket = new TSocket(host, port);
+        TTransport trans = framed ? new TFramedTransport(socket) : socket;
         try
         {
-            socket.open();
+            trans.open();
         }
         catch (TTransportException e)
         {
-            throw new IOException(e);
+            throw new IOException("unable to connect to server", e);
         }
+        return new Cassandra.Client(new TBinaryProtocol(trans));
+    }
+
+    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+    {
+        Cassandra.Client client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+
         List<TokenRange> map;
         try
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Mon Jul 26 21:01:36 2010
@@ -26,9 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.thrift.AuthenticationException;
@@ -44,6 +42,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,16 +150,18 @@ public class ColumnFamilyOutputFormat ex
     public static Cassandra.Client createAuthenticatedClient(TSocket socket, JobContext context)
     throws InvalidRequestException, TException, AuthenticationException, AuthorizationException
     {
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
         socket.open();
         client.set_keyspace(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
-        Map<String, String> creds = new HashMap<String, String>();
-        creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()));
-        creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(context.getConfiguration()));
-        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-        if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
+        if (ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()) != null)
+        {
+            Map<String, String> creds = new HashMap<String, String>();
+            creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()));
+            creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(context.getConfiguration()));
+            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
             client.login(authRequest);
+        }
         return client;
 
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Mon Jul 26 21:01:36 2010
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.*;
@@ -48,6 +49,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
 public class ColumnFamilyRecordReader extends RecordReader<byte[], SortedMap<byte[], IColumn>>
@@ -60,8 +62,6 @@ public class ColumnFamilyRecordReader ex
     private int batchRowCount; // fetch this many per batch
     private String cfName;
     private String keyspace;
-    private Configuration conf;
-    private AuthenticationRequest authRequest;
     private TSocket socket;
     private Cassandra.Client client;
 
@@ -94,18 +94,42 @@ public class ColumnFamilyRecordReader ex
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
     {
         this.split = (ColumnFamilySplit) split;
-        conf = context.getConfiguration();
+        Configuration conf = context.getConfiguration();
         predicate = ConfigHelper.getInputSlicePredicate(conf);
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
         batchRowCount = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
         keyspace = ConfigHelper.getInputKeyspace(conf);
         
-        Map<String, String> creds = new HashMap<String, String>();
-        creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
-        creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
-        authRequest = new AuthenticationRequest(creds);
-        
+        try
+        {
+            // only need to connect once
+            if (socket != null && socket.isOpen())
+                return;
+
+            // create connection using thrift
+            String location = getLocation();
+            socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
+            client = new Cassandra.Client(binaryProtocol);
+            socket.open();
+
+            // log in
+            client.set_keyspace(keyspace);
+            if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
+            {
+                Map<String, String> creds = new HashMap<String, String>();
+                creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+                creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+                AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+                client.login(authRequest);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
         iter = new RowIterator();
     }
     
@@ -117,6 +141,41 @@ public class ColumnFamilyRecordReader ex
         return true;
     }
 
+    // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+    // not necessarily on Cassandra machines, too.  This should be adequate for single-DC clusters, at least.
+    private String getLocation()
+    {
+        InetAddress[] localAddresses;
+        try
+        {
+            localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+        for (InetAddress address : localAddresses)
+        {
+            for (String location : split.getLocations())
+            {
+                InetAddress locationAddress = null;
+                try
+                {
+                    locationAddress = InetAddress.getByName(location);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new AssertionError(e);
+                }
+                if (address.equals(locationAddress))
+                {
+                    return location;
+                }
+            }
+        }
+        return split.getLocations()[0];
+    }
+
     private class RowIterator extends AbstractIterator<Pair<byte[], SortedMap<byte[], IColumn>>>
     {
         private List<KeySlice> rows;
@@ -134,7 +193,7 @@ public class ColumnFamilyRecordReader ex
                 partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
                 Map<String, String> info = client.describe_keyspace(keyspace).get(cfName);
                 comparator = FBUtilities.getComparator(info.get("CompareWith"));
-                subComparator = FBUtilities.getComparator(info.get("CompareSubcolumnsWith"));
+                subComparator = info.get("CompareSubcolumnsWith") == null ? null : FBUtilities.getComparator(info.get("CompareSubcolumnsWith"));
             }
             catch (ConfigurationException e)
             {
@@ -158,16 +217,7 @@ public class ColumnFamilyRecordReader ex
             
             if (rows != null)
                 return;
-            
-            try
-            {
-                maybeConnect();
-            } 
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            } 
-            
+
             if (startToken == null)
             {
                 startToken = split.getStartToken();
@@ -208,68 +258,6 @@ public class ColumnFamilyRecordReader ex
                 throw new RuntimeException(e);
             }
         }
-        
-        /**
-         * Connect, log in and set up the correct comparator.
-         */
-        private void maybeConnect() throws InvalidRequestException, TException, AuthenticationException, 
-            AuthorizationException, NotFoundException, InstantiationException, IllegalAccessException, 
-            ClassNotFoundException, NoSuchFieldException
-        {
-            // only need to connect once
-            if (socket != null && socket.isOpen())
-                return;
-
-            // create connection using thrift
-            String location = getLocation();
-            socket = new TSocket(location, DatabaseDescriptor.getRpcPort());
-            TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
-            client = new Cassandra.Client(binaryProtocol);
-            socket.open();
-            
-            // log in
-            client.set_keyspace(keyspace);
-            if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
-            {
-                client.login(authRequest);
-            }
-        }
-
-
-        // we don't use endpointsnitch since we are trying to support hadoop nodes that are
-        // not necessarily on Cassandra machines, too.  This should be adequate for single-DC clusters, at least.
-        private String getLocation()
-        {
-            InetAddress[] localAddresses = new InetAddress[0];
-            try
-            {
-                localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
-            }
-            catch (UnknownHostException e)
-            {
-                throw new AssertionError(e);
-            }
-            for (InetAddress address : localAddresses)
-            {
-                for (String location : split.getLocations())
-                {
-                    InetAddress locationAddress = null;
-                    try
-                    {
-                        locationAddress = InetAddress.getByName(location);
-                    }
-                    catch (UnknownHostException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-                    if (address.equals(locationAddress))
-                    {
-                        return location;
-                    }
-                }
-            }
-            return split.getLocations()[0];
-        }
 
         /**
          * @return total number of rows read by this record reader
@@ -306,8 +294,8 @@ public class ColumnFamilyRecordReader ex
 
         private IColumn unthriftifySuper(SuperColumn super_column)
         {
-            ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName);
-            AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(keyspace, cfName);
+            ClockType clockType = ClockType.Timestamp; // TODO generalize
+            AbstractReconciler reconciler = new TimestampReconciler(); // TODO generalize
             org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType, reconciler);
             for (Column column : super_column.columns)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Jul 26 21:01:36 2010
@@ -242,7 +242,7 @@ public class ConfigHelper
         return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
     }
 
-    public static int getThriftPort(Configuration conf)
+    public static int getRpcPort(Configuration conf)
     {
         String v = conf.get(THRIFT_PORT);
         return v == null ? DatabaseDescriptor.getRpcPort() : Integer.valueOf(v);