You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/10 15:43:13 UTC

[2/5] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e42164b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e42164b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e42164b6

Branch: refs/heads/trunk
Commit: e42164b63baf7c86ac64078f68e61097c4741711
Parents: 1e64a9d 177f607
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Nov 10 14:39:16 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Nov 10 14:39:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 94 ++++++++++----------
 2 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e42164b6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 33adefb,81ceb25..24e42c0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,47 @@@
 -2.2.4
 +3.0.1
++Merged from 2.2:
+  * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 +Merged from 2.1:
 + * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
 + * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
 +
 +
 +3.0
 + * Fix AssertionError while flushing memtable due to materialized views
 +   incorrectly inserting empty rows (CASSANDRA-10614)
 + * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650)
 + * Don't use -1 for the position of partition key in schema (CASSANDRA-10491)
 + * Fix distinct queries in mixed version cluster (CASSANDRA-10573)
 + * Skip sstable on clustering in names query (CASSANDRA-10571)
 + * Remove value skipping as it breaks read-repair (CASSANDRA-10655)
 + * Fix bootstrapping with MVs (CASSANDRA-10621)
 + * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584)
 + * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634)
 + * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600)
 + * Fix reading of legacy sstables (CASSANDRA-10590)
 + * Use CQL type names in schema metadata tables (CASSANDRA-10365)
 + * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
 + * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608)
 + * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068)
 + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
 + * Don't use 'names query' read path for counters (CASSANDRA-10572)
 + * Fix backward compatibility for counters (CASSANDRA-10470)
 + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628)
 + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
 + * Fix thrift cas operations with defined columns (CASSANDRA-10576)
 + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
 + * Fix thrift get() queries with defined columns (CASSANDRA-10586)
 + * Fix marking of indexes as built and removed (CASSANDRA-10601)
 + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
 + * Fix batches on multiple tables (CASSANDRA-10554)
 + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
 + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
 + * Remove token generator (CASSANDRA-5261)
 + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
 + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
 + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
 + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
 +Merged from 2.2:
   * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
   * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
   * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e42164b6/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 6b4caa5,14e24fb..23beba3
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@@ -108,31 -109,29 +108,29 @@@ class CqlRecordWriter extends RecordWri
      CqlRecordWriter(Configuration conf)
      {
          this.conf = conf;
 -        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
 -        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
 +        this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
 +        batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32);
          this.clients = new HashMap<>();
  
-         try
+         try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf))
          {
              String keyspace = ConfigHelper.getOutputKeyspace(conf);
-             try (Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace))
+             Session client = cluster.connect(keyspace);
+             ringCache = new NativeRingCache(conf);
+             if (client != null)
              {
-                 ringCache = new NativeRingCache(conf);
-                 if (client != null)
-                 {
-                     TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
-                     clusterColumns = tableMetadata.getClusteringColumns();
-                     partitionKeyColumns = tableMetadata.getPartitionKey();
- 
-                     String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
-                     if (cqlQuery.toLowerCase().startsWith("insert"))
-                         throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
-                     cql = appendKeyWhereClauses(cqlQuery);
-                 }
-                 else
-                 {
-                     throw new IllegalArgumentException("Invalid configuration specified " + conf);
-                 }
+                 TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+                 clusterColumns = tableMetadata.getClusteringColumns();
+                 partitionKeyColumns = tableMetadata.getPartitionKey();
+ 
+                 String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
+                 if (cqlQuery.toLowerCase().startsWith("insert"))
+                     throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
+                 cql = appendKeyWhereClauses(cqlQuery);
+             }
+             else
+             {
+                 throw new IllegalArgumentException("Invalid configuration specified " + conf);
              }
          }
          catch (Exception e)
@@@ -298,34 -298,37 +297,37 @@@
                  while (true)
                  {
                      // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
-                     try
+                     if (session != null)
                      {
-                         int i = 0;
-                         PreparedStatement statement = preparedStatement(client);
-                         while (bindVariables != null)
+                         try
                          {
-                             BoundStatement boundStatement = new BoundStatement(statement);
-                             for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                             int i = 0;
+                             PreparedStatement statement = preparedStatement(session);
+                             while (bindVariables != null)
                              {
-                                 boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                 BoundStatement boundStatement = new BoundStatement(statement);
+                                 for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                                 {
+                                     boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                 }
+                                 session.execute(boundStatement);
+                                 i++;
+ 
+                                 if (i >= batchThreshold)
+                                     break;
+                                 bindVariables = queue.poll();
                              }
-                             client.execute(boundStatement);
-                             i++;
- 
-                             if (i >= batchThreshold)
-                                 break;
-                             bindVariables = queue.poll();
+                             break;
                          }
-                         break;
-                     }
-                     catch (Exception e)
-                     {
-                         closeInternal();
-                         if (!iter.hasNext())
+                         catch (Exception e)
                          {
-                             lastException = new IOException(e);
-                             break outer;
+                             closeInternal();
+                             if (!iter.hasNext())
+                             {
+                                 lastException = new IOException(e);
+                                 break outer;
+                             }
 -                        }                        
 +                        }
                      }
  
                      // attempt to connect to a different endpoint