You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2020/04/23 13:53:25 UTC

[cassandra] branch trunk updated: Fix CqlInputFormat regression from the switch to system.size_estimates

This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8327eb  Fix CqlInputFormat regression from the switch to system.size_estimates
a8327eb is described below

commit a8327eb8868c8d9d03c253a88509ce64d2ac227b
Author: David Capwell <dc...@gmail.com>
AuthorDate: Thu Mar 12 10:29:37 2020 -0700

    Fix CqlInputFormat regression from the switch to system.size_estimates
    
    patch by David Capwell; reviewed by Aleksey Yeschenko and Brandon
    Williams for CASSANDRA-15637
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/db/SizeEstimatesRecorder.java |  48 +++-
 .../org/apache/cassandra/db/SystemKeyspace.java    |  79 +++++-
 .../cassandra/hadoop/cql3/CqlClientHelper.java     |  91 +++++++
 .../cassandra/hadoop/cql3/CqlInputFormat.java      | 298 ++++++++++++++++-----
 .../LimitedLocalNodeFirstLocalBalancingPolicy.java |  18 +-
 .../apache/cassandra/service/StorageService.java   |  30 ++-
 .../service/StorageServiceServerTest.java          |  39 +++
 8 files changed, 508 insertions(+), 96 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d7e764c..ab8a7eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Fix CqlInputFormat regression from the switch to system.size_estimates (CASSANDRA-15637)
  * Allow sending Entire SSTables over SSL (CASSANDRA-15740)
  * Fix CQLSH UTF-8 encoding issue for Python 2/3 compatibility (CASSANDRA-15739)
  * Fix batch statement preparation when multiple tables and parameters are used (CASSANDRA-15730)
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index d61c297..fe38d64 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -70,12 +70,39 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
 
         for (Keyspace keyspace : Keyspace.nonLocalStrategy())
         {
-            Collection<Range<Token>> localRanges = StorageService.instance.getPrimaryRangesForEndpoint(keyspace.getName(),
-                    FBUtilities.getBroadcastAddressAndPort());
+            // In tools the call to describe_splits_ex() used to be coupled with the call to describe_local_ring() so
+            // most access was for the local primary range; after creating the size_estimates table this was changed
+            // to be the primary range.
+            // In a multi-dc setup its not uncommon for the local ring to be offset by 1 for the next DC; example:
+            // DC1: [0, 10, 20, 30]
+            // DC2: [1, 11, 21, 31]
+            // DC3: [2, 12, 22, 32]
+            // When working with the primary ring we have:
+            // [0, 1, 2, 10, 11, 12, 20, 21, 22, 30, 31, 32]
+            // this then leads to primrary ranges with one token in it, which cause the estimates to be less useful.
+            // Since only one range was published some tools make this assumption; for this reason we can't publish
+            // all ranges (including the replica ranges) nor can we keep backwards compatability and publish primary
+            // range.  If we publish multiple ranges downstream integrations may start to see duplicate data.
+            // See CASSANDRA-15637
+            Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRanges(keyspace.getName());
+            Collection<Range<Token>> localPrimaryRanges = StorageService.instance.getLocalPrimaryRange();
+            boolean rangesAreEqual = primaryRanges.equals(localPrimaryRanges);
             for (ColumnFamilyStore table : keyspace.getColumnFamilyStores())
             {
                 long start = System.nanoTime();
-                recordSizeEstimates(table, localRanges);
+
+                // compute estimates for primary ranges for backwards compatability
+                Map<Range<Token>, Pair<Long, Long>> estimates = computeSizeEstimates(table, primaryRanges);
+                SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, table.metadata.name, estimates);
+                SystemKeyspace.updateTableEstimates(table.metadata.keyspace, table.metadata.name, SystemKeyspace.TABLE_ESTIMATES_TYPE_PRIMARY, estimates);
+
+                if (!rangesAreEqual)
+                {
+                    // compute estimate for local primary range
+                    estimates = computeSizeEstimates(table, localPrimaryRanges);
+                }
+                SystemKeyspace.updateTableEstimates(table.metadata.keyspace, table.metadata.name, SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY, estimates);
+
                 long passed = System.nanoTime() - start;
                 if (logger.isTraceEnabled())
                     logger.trace("Spent {} milliseconds on estimating {}.{} size",
@@ -87,11 +114,11 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
     }
 
     @SuppressWarnings("resource")
-    private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges)
+    private static Map<Range<Token>, Pair<Long, Long>> computeSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> ranges)
     {
         // for each local primary range, estimate (crudely) mean partition size and partitions count.
-        Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size());
-        for (Range<Token> localRange : localRanges)
+        Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(ranges.size());
+        for (Range<Token> localRange : ranges)
         {
             for (Range<Token> unwrappedRange : localRange.unwrap())
             {
@@ -124,11 +151,10 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
             }
         }
 
-        // atomically update the estimates.
-        SystemKeyspace.updateSizeEstimates(table.metadata.keyspace, table.metadata.name, estimates);
+        return estimates;
     }
 
-    private long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range)
+    private static long estimatePartitionsCount(Collection<SSTableReader> sstables, Range<Token> range)
     {
         long count = 0;
         for (SSTableReader sstable : sstables)
@@ -136,7 +162,7 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
         return count;
     }
 
-    private long estimateMeanPartitionSize(Collection<SSTableReader> sstables)
+    private static long estimateMeanPartitionSize(Collection<SSTableReader> sstables)
     {
         long sum = 0, count = 0;
         for (SSTableReader sstable : sstables)
@@ -151,6 +177,6 @@ public class SizeEstimatesRecorder extends SchemaChangeListener implements Runna
     @Override
     public void onDropTable(String keyspace, String table)
     {
-        SystemKeyspace.clearSizeEstimates(keyspace, table);
+        SystemKeyspace.clearEstimates(keyspace, table);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index c427c8f..eb31f2d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -100,7 +100,9 @@ public final class SystemKeyspace
     public static final String PEER_EVENTS_V2 = "peer_events_v2";
     public static final String COMPACTION_HISTORY = "compaction_history";
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
-    public static final String SIZE_ESTIMATES = "size_estimates";
+    public static final String TABLE_ESTIMATES = "table_estimates";
+    public static final String TABLE_ESTIMATES_TYPE_PRIMARY = "primary";
+    public static final String TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY = "local_primary";
     public static final String AVAILABLE_RANGES_V2 = "available_ranges_v2";
     public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
     public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress";
@@ -112,6 +114,7 @@ public final class SystemKeyspace
     @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
     @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges";
     @Deprecated public static final String LEGACY_AVAILABLE_RANGES = "available_ranges";
+    @Deprecated public static final String LEGACY_SIZE_ESTIMATES = "size_estimates";
 
 
     public static final TableMetadata Batches =
@@ -237,9 +240,10 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))")
                 .build();
 
-    private static final TableMetadata SizeEstimates =
-        parse(SIZE_ESTIMATES,
-                "per-table primary range size estimates",
+    @Deprecated
+    private static final TableMetadata LegacySizeEstimates =
+        parse(LEGACY_SIZE_ESTIMATES,
+              "per-table primary range size estimates, table is deprecated in favor of " + TABLE_ESTIMATES,
                 "CREATE TABLE %s ("
                 + "keyspace_name text,"
                 + "table_name text,"
@@ -250,6 +254,20 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))")
                 .build();
 
+    private static final TableMetadata TableEstimates =
+        parse(TABLE_ESTIMATES,
+              "per-table range size estimates",
+              "CREATE TABLE %s ("
+               + "keyspace_name text,"
+               + "table_name text,"
+               + "range_type text,"
+               + "range_start text,"
+               + "range_end text,"
+               + "mean_partition_size bigint,"
+               + "partitions_count bigint,"
+               + "PRIMARY KEY ((keyspace_name), table_name, range_type, range_start, range_end))")
+               .build();
+
     private static final TableMetadata AvailableRangesV2 =
     parse(AVAILABLE_RANGES_V2,
           "available keyspace/ranges during bootstrap/replace that are ready to be served",
@@ -397,7 +415,8 @@ public final class SystemKeyspace
                          LegacyPeerEvents,
                          CompactionHistory,
                          SSTableActivity,
-                         SizeEstimates,
+                         LegacySizeEstimates,
+                         TableEstimates,
                          AvailableRangesV2,
                          LegacyAvailableRanges,
                          TransferredRangesV2,
@@ -1248,40 +1267,74 @@ public final class SystemKeyspace
     public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates)
     {
         long timestamp = FBUtilities.timestampMicros();
-        PartitionUpdate.Builder update = new PartitionUpdate.Builder(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size());
+        int nowInSec = FBUtilities.nowInSeconds();
+        PartitionUpdate.Builder update = new PartitionUpdate.Builder(LegacySizeEstimates, UTF8Type.instance.decompose(keyspace), LegacySizeEstimates.regularAndStaticColumns(), estimates.size());
         // delete all previous values with a single range tombstone.
+        update.add(new RangeTombstone(Slice.make(LegacySizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
+
+        // add a CQL row for each primary token range.
+        for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())
+        {
+            Range<Token> range = entry.getKey();
+            Pair<Long, Long> values = entry.getValue();
+            update.add(Rows.simpleBuilder(LegacySizeEstimates, table, range.left.toString(), range.right.toString())
+                           .timestamp(timestamp)
+                           .add("partitions_count", values.left)
+                           .add("mean_partition_size", values.right)
+                           .build());
+        }
+        new Mutation(update.build()).apply();
+    }
+
+    /**
+     * Writes the current partition count and size estimates into table_estimates
+     */
+    public static void updateTableEstimates(String keyspace, String table, String type, Map<Range<Token>, Pair<Long, Long>> estimates)
+    {
+        long timestamp = FBUtilities.timestampMicros();
         int nowInSec = FBUtilities.nowInSeconds();
-        update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
+        PartitionUpdate.Builder update = new PartitionUpdate.Builder(TableEstimates, UTF8Type.instance.decompose(keyspace), TableEstimates.regularAndStaticColumns(), estimates.size());
+
+        // delete all previous values with a single range tombstone.
+        update.add(new RangeTombstone(Slice.make(TableEstimates.comparator, table, type), new DeletionTime(timestamp - 1, nowInSec)));
 
         // add a CQL row for each primary token range.
         for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())
         {
             Range<Token> range = entry.getKey();
             Pair<Long, Long> values = entry.getValue();
-            update.add(Rows.simpleBuilder(SizeEstimates, table, range.left.toString(), range.right.toString())
+            update.add(Rows.simpleBuilder(TableEstimates, table, type, range.left.toString(), range.right.toString())
                            .timestamp(timestamp)
                            .add("partitions_count", values.left)
                            .add("mean_partition_size", values.right)
                            .build());
         }
+
         new Mutation(update.build()).apply();
     }
 
+
     /**
      * Clears size estimates for a table (on table drop)
      */
-    public static void clearSizeEstimates(String keyspace, String table)
+    public static void clearEstimates(String keyspace, String table)
     {
-        String cql = format("DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?", SizeEstimates.toString());
+        String cqlFormat = "DELETE FROM %s WHERE keyspace_name = ? AND table_name = ?";
+        String cql = format(cqlFormat, LegacySizeEstimates.toString());
+        executeInternal(cql, keyspace, table);
+        cql = String.format(cqlFormat, TableEstimates.toString());
         executeInternal(cql, keyspace, table);
     }
 
     /**
      * Clears size estimates for a keyspace (used to manually clean when we miss a keyspace drop)
      */
-    public static void clearSizeEstimates(String keyspace)
+    public static void clearEstimates(String keyspace)
     {
-        String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+        String cqlFormat = "DELETE FROM %s WHERE keyspace_name = ?";
+        String cql = String.format(cqlFormat, LegacySizeEstimates.toString());
+        executeInternal(cql, keyspace);
+        cql = String.format(cqlFormat, TableEstimates.toString());
         executeInternal(cql, keyspace);
     }
 
@@ -1291,7 +1344,7 @@ public final class SystemKeyspace
     public static synchronized SetMultimap<String, String> getTablesWithSizeEstimates()
     {
         SetMultimap<String, String> keyspaceTableMap = HashMultimap.create();
-        String cql = String.format("SELECT keyspace_name, table_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SIZE_ESTIMATES);
+        String cql = String.format("SELECT keyspace_name, table_name FROM %s", TableEstimates.toString(), TABLE_ESTIMATES_TYPE_PRIMARY);
         UntypedResultSet rs = executeInternal(cql);
         for (UntypedResultSet.Row row : rs)
         {
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java
new file mode 100644
index 0000000..d154243
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlClientHelper.java
@@ -0,0 +1,91 @@
+package org.apache.cassandra.hadoop.cql3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.TokenRange;
+
+public class CqlClientHelper
+{
+    private CqlClientHelper()
+    {
+    }
+
+    public static Map<TokenRange, List<Host>> getLocalPrimaryRangeForDC(String keyspace, Metadata metadata, String targetDC)
+    {
+        Objects.requireNonNull(keyspace, "keyspace");
+        Objects.requireNonNull(metadata, "metadata");
+        Objects.requireNonNull(targetDC, "targetDC");
+
+        // In 2.1 the logic was to have a set of nodes used as a seed, they were used to query
+        // client.describe_local_ring(keyspace) -> List<TokenRange>; this should include all nodes in the local dc.
+        // TokenRange contained the endpoints in order, so .endpoints.get(0) is the primary owner
+        // Client does not have a similar API, instead it returns Set<Host>.  To replicate this we first need
+        // to compute the primary owners, then add in the replicas
+
+        List<Token> tokens = new ArrayList<>();
+        Map<Token, Host> tokenToHost = new HashMap<>();
+        for (Host host : metadata.getAllHosts())
+        {
+            if (!targetDC.equals(host.getDatacenter()))
+                continue;
+
+            for (Token token : host.getTokens())
+            {
+                Host previous = tokenToHost.putIfAbsent(token, host);
+                if (previous != null)
+                    throw new IllegalStateException("Two hosts share the same token; hosts " + host.getHostId() + ":"
+                                                    + host.getTokens() + ", " + previous.getHostId() + ":" + previous.getTokens());
+                tokens.add(token);
+            }
+        }
+        Collections.sort(tokens);
+
+        Map<TokenRange, List<Host>> rangeToReplicas = new HashMap<>();
+
+        // The first token in the ring uses the last token as its 'start', handle this here to simplify the loop
+        Token start = tokens.get(tokens.size() - 1);
+        Token end = tokens.get(0);
+
+        addRange(keyspace, metadata, tokenToHost, rangeToReplicas, start, end);
+        for (int i = 1; i < tokens.size(); i++)
+        {
+            start = tokens.get(i - 1);
+            end = tokens.get(i);
+
+            addRange(keyspace, metadata, tokenToHost, rangeToReplicas, start, end);
+        }
+
+        return rangeToReplicas;
+    }
+
+    private static void addRange(String keyspace,
+                                 Metadata metadata,
+                                 Map<Token, Host> tokenToHost,
+                                 Map<TokenRange, List<Host>> rangeToReplicas,
+                                 Token start, Token end)
+    {
+        Host host = tokenToHost.get(end);
+        String dc = host.getDatacenter();
+
+        TokenRange range = metadata.newTokenRange(start, end);
+        List<Host> replicas = new ArrayList<>();
+        replicas.add(host);
+        // get all the replicas for the specific DC
+        for (Host replica : metadata.getReplicas(keyspace, range))
+        {
+            if (dc.equals(replica.getDatacenter()) && !host.equals(replica))
+                replicas.add(replica);
+        }
+        List<Host> previous = rangeToReplicas.put(range, replicas);
+        if (previous != null)
+            throw new IllegalStateException("Two hosts (" + host + ", " + previous + ") map to the same token range: " + range);
+    }
+}
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index eae1fa2..262965f 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.hadoop.cql3;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -27,9 +28,16 @@ import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.TokenRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
+import com.datastax.driver.core.exceptions.InvalidQueryException;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
@@ -130,10 +138,15 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
         List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
 
-        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf);
+        String[] inputInitialAddress = ConfigHelper.getInputInitialAddress(conf).split(",");
+        try (Cluster cluster = CqlConfigHelper.getInputCluster(inputInitialAddress, conf);
              Session session = cluster.connect())
         {
-            List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>();
+            List<SplitFuture> splitfutures = new ArrayList<>();
+            //TODO if the job range is defined and does perfectly match tokens, then the logic will be unable to get estimates since they are pre-computed
+            // tokens: [0, 10, 20]
+            // job range: [0, 10) - able to get estimate
+            // job range: [5, 15) - unable to get estimate
             Pair<String, String> jobKeyRange = ConfigHelper.getInputKeyRange(conf);
             Range<Token> jobRange = null;
             if (jobKeyRange != null)
@@ -145,14 +158,18 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
             Metadata metadata = cluster.getMetadata();
 
             // canonical ranges and nodes holding replicas
-            Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(keyspace, metadata);
-
+            Map<TokenRange, List<Host>> masterRangeNodes = getRangeMap(keyspace, metadata, getTargetDC(metadata, inputInitialAddress));
             for (TokenRange range : masterRangeNodes.keySet())
             {
                 if (jobRange == null)
                 {
-                    // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session)));
+                    for (TokenRange unwrapped : range.unwrap())
+                    {
+                        // for each tokenRange, pick a live owner and ask it for the byte-sized splits
+                        SplitFuture task = new SplitFuture(new SplitCallable(unwrapped, masterRangeNodes.get(range), conf, session));
+                        executor.submit(task);
+                        splitfutures.add(task);
+                    }
                 }
                 else
                 {
@@ -161,24 +178,61 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
                     {
                         for (TokenRange intersection: range.intersectWith(jobTokenRange))
                         {
-                            // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf, session)));
+                            for (TokenRange unwrapped : intersection.unwrap())
+                            {
+                                // for each tokenRange, pick a live owner and ask it for the byte-sized splits
+                                SplitFuture task = new SplitFuture(new SplitCallable(unwrapped,  masterRangeNodes.get(range), conf, session));
+                                executor.submit(task);
+                                splitfutures.add(task);
+                            }
                         }
                     }
                 }
             }
 
             // wait until we have all the results back
-            for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures)
+            List<SplitFuture> failedTasks = new ArrayList<>();
+            int maxSplits = 0;
+            long expectedPartionsForFailedRanges = 0;
+            for (SplitFuture task : splitfutures)
             {
                 try
                 {
-                    splits.addAll(futureInputSplits.get());
+                    List<ColumnFamilySplit> tokenRangeSplits = task.get();
+                    if (tokenRangeSplits.size() > maxSplits)
+                    {
+                        maxSplits = tokenRangeSplits.size();
+                        expectedPartionsForFailedRanges = tokenRangeSplits.get(0).getLength();
+                    }
                 }
                 catch (Exception e)
                 {
-                    throw new IOException("Could not get input splits", e);
+                    failedTasks.add(task);
+                }
+            }
+            // The estimate is only stored on a single host, if that host is down then can not get the estimate
+            // its more than likely that a single host could be "too large" for one split but there is no way of
+            // knowning!
+            // This logic attempts to guess the estimate from all the successful ranges
+            if (!failedTasks.isEmpty())
+            {
+                // if every split failed this will be 0
+                if (maxSplits == 0)
+                    throwAllSplitsFailed(failedTasks);
+                for (SplitFuture task : failedTasks)
+                {
+                    try
+                    {
+                        // the task failed, so this should throw
+                        task.get();
+                    }
+                    catch (Exception cause)
+                    {
+                        logger.warn("Unable to get estimate for {}, the host {} had a exception; falling back to default estimate", task.splitCallable.tokenRange, task.splitCallable.hosts.get(0), cause);
+                    }
                 }
+                for (SplitFuture task : failedTasks)
+                    splits.addAll(toSplit(task.splitCallable.hosts, splitTokenRange(task.splitCallable.tokenRange, maxSplits, expectedPartionsForFailedRanges)));
             }
         }
         finally
@@ -191,19 +245,73 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         return splits;
     }
 
-    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+    private static IllegalStateException throwAllSplitsFailed(List<SplitFuture> failedTasks)
     {
-        return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
-                metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+        IllegalStateException exception = new IllegalStateException("No successful tasks found");
+        for (SplitFuture task : failedTasks)
+        {
+            try
+            {
+                // the task failed, so this should throw
+                task.get();
+            }
+            catch (Exception cause)
+            {
+                exception.addSuppressed(cause);
+            }
+        }
+        throw exception;
     }
 
-    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session)
+    private static String getTargetDC(Metadata metadata, String[] inputInitialAddress)
+    {
+        BiMultiValMap<InetAddress, String> addressToDc = new BiMultiValMap<>();
+        Multimap<String, InetAddress> dcToAddresses = addressToDc.inverse();
+
+        // only way to match is off the broadcast addresses, so for all hosts do a existence check
+        Set<InetAddress> addresses = new HashSet<>(inputInitialAddress.length);
+        for (String inputAddress : inputInitialAddress)
+            addresses.addAll(parseAddress(inputAddress));
+
+        for (Host host : metadata.getAllHosts())
+        {
+            InetAddress address = host.getBroadcastAddress();
+            if (addresses.contains(address))
+                addressToDc.put(address, host.getDatacenter());
+        }
+
+        switch (dcToAddresses.keySet().size())
+        {
+            case 1:
+                return Iterables.getOnlyElement(dcToAddresses.keySet());
+            case 0:
+                throw new IllegalStateException("Input addresses could not be used to find DC; non match client metadata");
+            default:
+                // Mutliple DCs found, attempt to pick the first based off address list. This is to mimic the 2.1
+                // behavior which would connect in order and the first node successfully able to connect to was the
+                // local DC to use; since client abstracts this, we rely on existence as a proxy for connect.
+                for (String inputAddress : inputInitialAddress)
+                {
+                    for (InetAddress add : parseAddress(inputAddress))
+                    {
+                        String dc = addressToDc.get(add);
+                        // possible the address isn't in the cluster and the client dropped, so ignore null
+                        if (dc != null)
+                            return dc;
+                    }
+                }
+                // some how we were able to connect to the cluster, find multiple DCs using matching, and yet couldn't
+                // match again...
+                throw new AssertionError("Unable to infer datacenter from initial addresses; multiple datacenters found "
+                                         + dcToAddresses.keySet() + ", should only use addresses from one datacenter");
+        }
+    }
+
+    private static List<InetAddress> parseAddress(String str)
     {
-        int splitSize = ConfigHelper.getInputSplitSize(conf);
-        int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
         try
         {
-            return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session);
+            return Arrays.asList(InetAddress.getAllByName(str));
         }
         catch (Exception e)
         {
@@ -211,22 +319,35 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         }
     }
 
-    private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata)
+    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+    {
+        return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+                metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+    }
+
+    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Host host, Configuration conf, Session session)
     {
-        return metadata.getTokenRanges()
-                       .stream()
-                       .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p)));
+        int splitSize = ConfigHelper.getInputSplitSize(conf);
+        int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf);
+        return describeSplits(keyspace, cfName, range, host, splitSize, splitSizeMb, session);
     }
 
-    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session)
+    private static Map<TokenRange, List<Host>> getRangeMap(String keyspace, Metadata metadata, String targetDC)
     {
-        String query = String.format("SELECT mean_partition_size, partitions_count " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
-                                     SchemaConstants.SYSTEM_KEYSPACE_NAME,
-                                     SystemKeyspace.SIZE_ESTIMATES);
+        return CqlClientHelper.getLocalPrimaryRangeForDC(keyspace, metadata, targetDC);
+    }
 
-        ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, Host host, int splitSize, int splitSizeMb, Session session)
+    {
+        // In 2.1 the host list was walked in-order (only move to next if IOException) and calls
+        // org.apache.cassandra.service.StorageService.getSplits(java.lang.String, java.lang.String, org.apache.cassandra.dht.Range<org.apache.cassandra.dht.Token>, int)
+        // that call computes totalRowCountEstimate (used to compute #splits) then splits the ring based off those estimates
+        //
+        // The main difference is that the estimates in 2.1 were computed based off the data, so replicas could answer the estimates
+        // In 3.0 we rely on the below CQL query which is local and only computes estimates for the primary range; this
+        // puts us in a sticky spot to answer, if the node fails what do we do?  3.0 behavior only matches 2.1 IFF all
+        // nodes are up and healthy
+        ResultSet resultSet = queryTableEstimates(session, host, keyspace, table, tokenRange);
 
         Row row = resultSet.one();
 
@@ -250,14 +371,47 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
         if (splitCount == 0)
         {
             Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
-            wrappedTokenRange.put(tokenRange, (long) 128);
+            wrappedTokenRange.put(tokenRange, partitionCount == 0 ? 128L : partitionCount);
             return wrappedTokenRange;
         }
 
+        return splitTokenRange(tokenRange, splitCount, partitionCount / splitCount);
+    }
+
+    private static ResultSet queryTableEstimates(Session session, Host host, String keyspace, String table, TokenRange tokenRange)
+    {
+        try
+        {
+            String query = String.format("SELECT mean_partition_size, partitions_count " +
+                                         "FROM %s.%s " +
+                                         "WHERE keyspace_name = ? AND table_name = ? AND range_type = '%s' AND range_start = ? AND range_end = ?",
+                                         SchemaConstants.SYSTEM_KEYSPACE_NAME,
+                                         SystemKeyspace.TABLE_ESTIMATES,
+                                         SystemKeyspace.TABLE_ESTIMATES_TYPE_LOCAL_PRIMARY);
+            Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host);
+            return session.execute(stmt);
+        }
+        catch (InvalidQueryException e)
+        {
+            // if the table doesn't exist, fall back to old table.  This is likely to return no records in a multi
+            // DC setup, but should work fine in a single DC setup.
+            String query = String.format("SELECT mean_partition_size, partitions_count " +
+                                         "FROM %s.%s " +
+                                         "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+                                         SchemaConstants.SYSTEM_KEYSPACE_NAME,
+                                         SystemKeyspace.LEGACY_SIZE_ESTIMATES);
+
+            Statement stmt = new SimpleStatement(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()).setHost(host);
+            return session.execute(stmt);
+        }
+    }
+
+    private static Map<TokenRange, Long> splitTokenRange(TokenRange tokenRange, int splitCount, long partitionCount)
+    {
         List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
         Map<TokenRange, Long> rangesWithLength = Maps.newHashMapWithExpectedSize(splitRanges.size());
         for (TokenRange range : splitRanges)
-            rangesWithLength.put(range, partitionCount/splitCount);
+            rangesWithLength.put(range, partitionCount);
 
         return rangesWithLength;
     }
@@ -277,56 +431,70 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long
      * Gets a token tokenRange and splits it up according to the suggested
      * size into input splits that Hadoop can use.
      */
-    class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>>
+    class SplitCallable implements Callable<List<ColumnFamilySplit>>
     {
 
         private final TokenRange tokenRange;
-        private final Set<Host> hosts;
+        private final List<Host> hosts;
         private final Configuration conf;
         private final Session session;
 
-        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session)
+        public SplitCallable(TokenRange tokenRange, List<Host> hosts, Configuration conf, Session session)
         {
-            this.tokenRange = tr;
+            Preconditions.checkArgument(!hosts.isEmpty(), "hosts list requires at least 1 host but was empty");
+            this.tokenRange = tokenRange;
             this.hosts = hosts;
             this.conf = conf;
             this.session = session;
         }
 
-        public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception
+        public List<ColumnFamilySplit> call() throws Exception
         {
-            ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>();
-            Map<TokenRange, Long> subSplits;
-            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session);
-            // turn the sub-ranges into InputSplits
-            String[] endpoints = new String[hosts.size()];
+            Map<TokenRange, Long> subSplits = getSubSplits(keyspace, cfName, tokenRange, hosts.get(0), conf, session);
+            return toSplit(hosts, subSplits);
+        }
 
-            // hadoop needs hostname, not ip
-            int endpointIndex = 0;
-            for (Host endpoint : hosts)
-                endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
+    }
 
-            boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
+    private static class SplitFuture extends FutureTask<List<ColumnFamilySplit>>
+    {
+        private final SplitCallable splitCallable;
 
-            for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet())
-            {
-                List<TokenRange> ranges = subSplitEntry.getKey().unwrap();
-                for (TokenRange subrange : ranges)
-                {
-                    ColumnFamilySplit split =
-                            new ColumnFamilySplit(
-                                    partitionerIsOpp ?
-                                            subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
-                                    partitionerIsOpp ?
-                                            subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(),
-                                    subSplitEntry.getValue(),
-                                    endpoints);
-
-                    logger.trace("adding {}", split);
-                    splits.add(split);
-                }
-            }
-            return splits;
+        SplitFuture(SplitCallable splitCallable)
+        {
+            super(splitCallable);
+            this.splitCallable = splitCallable;
         }
     }
+
+    private List<ColumnFamilySplit> toSplit(List<Host> hosts, Map<TokenRange, Long> subSplits)
+    {
+        // turn the sub-ranges into InputSplits
+        String[] endpoints = new String[hosts.size()];
+
+        // hadoop needs hostname, not ip
+        int endpointIndex = 0;
+        for (Host endpoint : hosts)
+            endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
+
+        boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner;
+
+        ArrayList<ColumnFamilySplit> splits = new ArrayList<>();
+        for (Map.Entry<TokenRange, Long> subSplitEntry : subSplits.entrySet())
+        {
+            TokenRange subrange = subSplitEntry.getKey();
+            ColumnFamilySplit split =
+                new ColumnFamilySplit(
+                    partitionerIsOpp ?
+                        subrange.getStart().toString().substring(2) : subrange.getStart().toString(),
+                    partitionerIsOpp ?
+                        subrange.getEnd().toString().substring(2) : subrange.getEnd().toString(),
+                    subSplitEntry.getValue(),
+                    endpoints);
+
+            logger.trace("adding {}", split);
+            splits.add(split);
+        }
+        return splits;
+    }
 }
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
index 256da2d..59b4eca 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
@@ -56,6 +56,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
     private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>();
 
     private final Set<InetAddress> replicaAddresses = new HashSet<>();
+    private final Set<String> allowedDCs = new CopyOnWriteArraySet<>();
 
     public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas)
     {
@@ -78,15 +79,22 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
     @Override
     public void init(Cluster cluster, Collection<Host> hosts)
     {
-        List<Host> replicaHosts = new ArrayList<>();
+        // first find which DCs the user defined
+        Set<String> dcs = new HashSet<>();
         for (Host host : hosts)
         {
             if (replicaAddresses.contains(host.getAddress()))
-            {
+                dcs.add(host.getDatacenter());
+        }
+        // filter to all nodes within the targeted DCs
+        List<Host> replicaHosts = new ArrayList<>();
+        for (Host host : hosts)
+        {
+            if (dcs.contains(host.getDatacenter()))
                 replicaHosts.add(host);
-            }
         }
         liveReplicaHosts.addAll(replicaHosts);
+        allowedDCs.addAll(dcs);
         logger.trace("Initialized with replica hosts: {}", replicaHosts);
     }
 
@@ -136,7 +144,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
     @Override
     public void onAdd(Host host)
     {
-        if (replicaAddresses.contains(host.getAddress()))
+        if (liveReplicaHosts.contains(host))
         {
             liveReplicaHosts.add(host);
             logger.trace("Added a new host {}", host);
@@ -146,7 +154,7 @@ class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
     @Override
     public void onUp(Host host)
     {
-        if (replicaAddresses.contains(host.getAddress()))
+        if (liveReplicaHosts.contains(host))
         {
             liveReplicaHosts.add(host);
             logger.trace("The host {} is now up", host);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index fd350bf..08f0612 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3677,14 +3677,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             String keyspace = tablesByKeyspace.getKey();
             if (!Schema.instance.getKeyspaces().contains(keyspace))
             {
-                SystemKeyspace.clearSizeEstimates(keyspace);
+                SystemKeyspace.clearEstimates(keyspace);
             }
             else
             {
                 for (String table : tablesByKeyspace.getValue())
                 {
                     if (Schema.instance.getTableMetadataRef(keyspace, table) == null)
-                        SystemKeyspace.clearSizeEstimates(keyspace, table);
+                        SystemKeyspace.clearEstimates(keyspace, table);
                 }
             }
         }
@@ -3912,6 +3912,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return localDCPrimaryRanges;
     }
 
+    public Collection<Range<Token>> getLocalPrimaryRange()
+    {
+        return getLocalPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddressAndPort());
+    }
+
+    public Collection<Range<Token>> getLocalPrimaryRangeForEndpoint(InetAddressAndPort referenceEndpoint)
+    {
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        TokenMetadata tokenMetadata = this.tokenMetadata.cloneOnlyTokenMap();
+        String dc = snitch.getDatacenter(referenceEndpoint);
+        Set<Token> tokens = new HashSet<>(tokenMetadata.getTokens(referenceEndpoint));
+
+        // filter tokens to the single DC
+        List<Token> filteredTokens = Lists.newArrayList();
+        for (Token token : tokenMetadata.sortedTokens())
+        {
+            InetAddressAndPort endpoint = tokenMetadata.getEndpoint(token);
+            if (dc.equals(snitch.getDatacenter(endpoint)))
+                filteredTokens.add(token);
+        }
+
+        return getAllRanges(filteredTokens).stream()
+                                           .filter(t -> tokens.contains(t.right))
+                                           .collect(Collectors.toList());
+    }
+
     /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 8e91342..6e7704a 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -193,6 +193,45 @@ public class StorageServiceServerTest
         // no need to insert extra data, even an "empty" database will have a little information in the system keyspace
         StorageService.instance.takeSnapshot(UUID.randomUUID().toString(), SchemaConstants.SCHEMA_KEYSPACE_NAME);
     }
+    @Test
+    public void testLocalPrimaryRangeForEndpointWithNetworkTopologyStrategy() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
+
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
+
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC1", "2");
+        configOptions.put("DC2", "2");
+        configOptions.put(ReplicationParams.CLASS, "NetworkTopologyStrategy");
+
+        Keyspace.clear("Keyspace1");
+        KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, configOptions));
+        Schema.instance.load(meta);
+
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.1"));
+        assertEquals(1, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A"))));
+
+        primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.2"));
+        assertEquals(1, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("C"))));
+
+        primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.4"));
+        assertEquals(1, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("B"))));
+
+        primaryRanges = StorageService.instance.getLocalPrimaryRangeForEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
+        assertEquals(1, primaryRanges.size());
+        assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("D"))));
+    }
 
     @Test
     public void testPrimaryRangeForEndpointWithinDCWithNetworkTopologyStrategy() throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org