You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/02/15 17:21:05 UTC

[1/3] git commit: r/m equals override patch by jbellis; reviewed by Vijay for CASSANDRA-6712

Repository: cassandra
Updated Branches:
  refs/heads/trunk 41433073f -> f64b31c42


r/m equals override
patch by jbellis; reviewed by Vijay for CASSANDRA-6712


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc503dd5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc503dd5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc503dd5

Branch: refs/heads/trunk
Commit: fc503dd52fd743a7e519dccc1be3a8bfd7606aef
Parents: 4143307
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Feb 15 10:06:39 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Feb 15 10:06:55 2014 -0600

----------------------------------------------------------------------
 src/java/org/apache/cassandra/config/CFMetaData.java | 9 ---------
 1 file changed, 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc503dd5/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 010a277..738627b 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -405,15 +405,6 @@ public final class CFMetaData
         }
 
         @Override
-        public boolean equals(Object obj)
-        {
-            if (! (obj instanceof SpeculativeRetry))
-                return false;
-            SpeculativeRetry rhs = (SpeculativeRetry) obj;
-            return Objects.equal(type, rhs.type) && Objects.equal(value, rhs.value);
-        }
-
-        @Override
         public String toString()
         {
             switch (type)


[2/3] git commit: Add flush directory distinct from compaction directories patch by jbellis and aleksey for CASSANDRA-6357

Posted by jb...@apache.org.
Add flush directory distinct from compaction directories
patch by jbellis and aleksey for CASSANDRA-6357


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f10148f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f10148f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f10148f4

Branch: refs/heads/trunk
Commit: f10148f4f1719f99bc3888b93f5a285c8af997d1
Parents: fc503dd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Feb 15 10:11:28 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Feb 15 10:11:28 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  15 +-
 .../org/apache/cassandra/config/Config.java     |   6 +-
 .../cassandra/config/DatabaseDescriptor.java    |  19 ++-
 .../org/apache/cassandra/db/Directories.java    | 158 +++++++++++--------
 src/java/org/apache/cassandra/db/Memtable.java  |   5 +
 .../db/compaction/AbstractCompactionTask.java   |   5 +
 .../db/compaction/CompactionManager.java        |   4 +-
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../cassandra/io/util/DiskAwareRunnable.java    |   4 +-
 .../cassandra/streaming/StreamReader.java       |   3 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   3 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   8 +-
 .../apache/cassandra/db/DirectoriesTest.java    |   8 +-
 .../io/sstable/SSTableSimpleWriterTest.java     |   2 +-
 15 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a3d7a9..abfd680 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1
+ * Add flush directory distinct from compaction directories (CASSANDRA-6357)
  * Require JNA by default (CASSANDRA-6575)
  * add listsnapshots command to nodetool (CASSANDRA-5742)
  * Introduce AtomicBTreeColumns (CASSANDRA-6271)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8ddd5e0..8538920 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -109,9 +109,15 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 data_file_directories:
     - /var/lib/cassandra/data
 
-# commit log
+# commit log.  when running on magnetic HDD, this should be a
+# separate spindle than the data directories.
 commitlog_directory: /var/lib/cassandra/commitlog
 
+# location to write flushing sstables to.  Ideally, this will also be
+# a separate spindle in HDD deployments.  If you only have two spindles,
+# have it share with the data spindle.
+flush_directory: /var/lib/cassandra/flush
+
 # policy for data disk failures:
 # stop: shut down gossip and Thrift, leaving the node effectively dead, but
 #       can still be inspected via JMX.
@@ -296,10 +302,9 @@ memtable_cleanup_threshold: 0.4
 
 # This sets the amount of memtable flush writer threads.  These will
 # be blocked by disk io, and each one will hold a memtable in memory
-# while blocked. If you have a large heap and many data directories,
-# you can increase this value for better flush performance.
-# By default this will be set to the amount of data directories defined.
-#memtable_flush_writers: 1
+# while blocked. If your flush directory is backed by SSD, you may
+# want to increase this; by default it will be set to 2.
+#memtable_flush_writers: 2
 
 # A fixed memory pool size in MB for for SSTable index summaries. If left
 # empty, this will default to 5% of the heap size. If the memory usage of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2eb9b18..1f47171 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -77,7 +77,10 @@ public class Config
     @Deprecated
     public Integer concurrent_replicates = null;
 
-    public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
+    // we don't want a lot of contention, but we also don't want to starve all other tables
+    // if a big one flushes. OS buffering should be able to minimize contention with 2 threads.
+    public int memtable_flush_writers = 2;
+
     public Integer memtable_total_space_in_mb;
     public float memtable_cleanup_threshold = 0.4f;
 
@@ -123,6 +126,7 @@ public class Config
     public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
 
     public String[] data_file_directories;
+    public String flush_directory;
 
     public String saved_caches_directory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 378fa8a..29dece6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -264,15 +264,8 @@ public class DatabaseDescriptor
             throw new ConfigurationException("memtable_heap_space_in_mb must be positive");
         logger.info("Global memtable heap threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
 
-        /* Memtable flush writer threads */
-        if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
-        {
+        if (conf.memtable_flush_writers < 1)
             throw new ConfigurationException("memtable_flush_writers must be at least 1");
-        }
-        else if (conf.memtable_flush_writers == null)
-        {
-            conf.memtable_flush_writers = conf.data_file_directories.length;
-        }
 
         /* Local IP or hostname to bind services to */
         if (conf.listen_address != null)
@@ -404,6 +397,9 @@ public class DatabaseDescriptor
         /* data file and commit log directories. they get created later, when they're needed. */
         if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
         {
+            if (conf.flush_directory == null)
+                conf.flush_directory = conf.data_file_directories[0];
+
             for (String datadir : conf.data_file_directories)
             {
                 if (datadir.equals(conf.commitlog_directory))
@@ -613,6 +609,8 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("saved_caches_directory must be specified");
 
             FileUtils.createDirectory(conf.saved_caches_directory);
+
+            FileUtils.createDirectory(conf.flush_directory);
         }
         catch (ConfigurationException e)
         {
@@ -1404,4 +1402,9 @@ public class DatabaseDescriptor
         String arch = System.getProperty("os.arch");
         return arch.contains("64") || arch.contains("sparcv9");
     }
+
+    public static String getFlushLocation()
+    {
+        return conf.flush_directory;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 6cd9da1..a6b7efa 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import static com.google.common.collect.Sets.newHashSet;
-
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOError;
@@ -33,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -40,20 +39,26 @@ import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.Uninterruptibles;
-
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
+import static com.google.common.collect.Sets.newHashSet;
+
 /**
  * Encapsulate handling of paths to the data files.
  *
@@ -86,13 +91,15 @@ public class Directories
     public static final String SNAPSHOT_SUBDIR = "snapshots";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
-    public static final DataDirectory[] dataFileLocations;
+    public static final DataDirectory[] dataDirectories;
+    public static final DataDirectory flushDirectory;
     static
     {
         String[] locations = DatabaseDescriptor.getAllDataFileLocations();
-        dataFileLocations = new DataDirectory[locations.length];
+        dataDirectories = new DataDirectory[locations.length];
         for (int i = 0; i < locations.length; ++i)
-            dataFileLocations[i] = new DataDirectory(new File(locations[i]));
+            dataDirectories[i] = new DataDirectory(new File(locations[i]));
+        flushDirectory = new DataDirectory(new File(DatabaseDescriptor.getFlushLocation()));
     }
 
 
@@ -170,7 +177,8 @@ public class Directories
     }
 
     private final CFMetaData metadata;
-    private final File[] sstableDirectories;
+    private final File[] dataPaths;
+    private final File flushPath;
 
     /**
      * Create Directories of given ColumnFamily.
@@ -181,57 +189,77 @@ public class Directories
     public Directories(CFMetaData metadata)
     {
         this.metadata = metadata;
-        this.sstableDirectories = new File[dataFileLocations.length];
+        if (StorageService.instance.isClientMode())
+        {
+            dataPaths = null;
+            flushPath = null;
+            return;
+        }
+
+        String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
+        int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
+        // secondary indicies go in the same directory as the base cf
+        String directoryName = idx > 0 ? metadata.cfName.substring(0, idx) + "-" + cfId : metadata.cfName + "-" + cfId;
 
-        // Determine SSTable directories
-        // If upgraded from version less than 2.1, use directories already exist.
-        for (int i = 0; i < dataFileLocations.length; ++i)
+        this.dataPaths = new File[dataDirectories.length];
+        // If upgraded from version less than 2.1, use existing directories
+        for (int i = 0; i < dataDirectories.length; ++i)
         {
             // check if old SSTable directory exists
-            sstableDirectories[i] = new File(dataFileLocations[i].location, join(metadata.ksName, metadata.cfName));
+            dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, this.metadata.cfName));
         }
-        boolean olderDirectoryExists = Iterables.any(Arrays.asList(sstableDirectories), new Predicate<File>()
+        boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
         {
             public boolean apply(File file)
             {
                 return file.exists();
             }
         });
-        if (olderDirectoryExists)
-            return;
-
-        // create directory name
-        String directoryName;
-        String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
-        int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
-        if (idx > 0)
-            // secondary index, goes in the same directory than the base cf
-            directoryName = metadata.cfName.substring(0, idx) + "-" + cfId;
-        else
-            directoryName = metadata.cfName + "-" + cfId;
+        if (!olderDirectoryExists)
+        {
+            // use 2.1-style path names
+            for (int i = 0; i < dataDirectories.length; ++i)
+                dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, directoryName));
+        }
 
-        for (int i = 0; i < dataFileLocations.length; ++i)
-            sstableDirectories[i] = new File(dataFileLocations[i].location, join(metadata.ksName, directoryName));
+        flushPath = new File(flushDirectory.location, join(metadata.ksName, directoryName));
 
-        if (!StorageService.instance.isClientMode())
+        for (File dir : allSSTablePaths())
         {
-            for (File dir : sstableDirectories)
+            try
             {
-                try
-                {
-                    FileUtils.createDirectory(dir);
-                }
-                catch (FSError e)
-                {
-                    // don't just let the default exception handler do this, we need the create loop to continue
-                    logger.error("Failed to create {} directory", dir);
-                    FileUtils.handleFSError(e);
-                }
+                FileUtils.createDirectory(dir);
+            }
+            catch (FSError e)
+            {
+                // don't just let the default exception handler do this, we need the create loop to continue
+                logger.error("Failed to create {} directory", dir);
+                FileUtils.handleFSError(e);
             }
         }
     }
 
     /**
+     * @return an iterable of all possible sstable paths, including flush and post-compaction locations.
+     * Guaranteed to only return one copy of each path, even if there is no dedicated flush location and
+     * it shares with the others.
+     */
+    private Iterable<File> allSSTablePaths()
+    {
+        return ImmutableSet.<File>builder().add(dataPaths).add(flushPath).build();
+    }
+
+    /**
+     * @return an iterable of all possible sstable directories, including flush and post-compaction locations.
+     * Guaranteed to only return one copy of each directories, even if there is no dedicated flush location and
+     * it shares with the others.
+     */
+    private static Iterable<DataDirectory> allSSTableDirectories()
+    {
+        return ImmutableSet.<DataDirectory>builder().add(dataDirectories).add(flushDirectory).build();
+    }
+
+    /**
      * Returns SSTable location which is inside given data directory.
      *
      * @param dataDirectory
@@ -239,7 +267,7 @@ public class Directories
      */
     public File getLocationForDisk(DataDirectory dataDirectory)
     {
-        for (File dir : sstableDirectories)
+        for (File dir : allSSTablePaths())
         {
             if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
                 return dir;
@@ -249,7 +277,7 @@ public class Directories
 
     public Descriptor find(String filename)
     {
-        for (File dir : sstableDirectories)
+        for (File dir : allSSTablePaths())
         {
             if (new File(dir, filename).exists())
                 return Descriptor.fromFilename(dir, filename).left;
@@ -257,9 +285,9 @@ public class Directories
         return null;
     }
 
-    public File getDirectoryForNewSSTables()
+    public File getDirectoryForCompactedSSTables()
     {
-        File path = getWriteableLocationAsFile();
+        File path = getCompactionLocationAsFile();
 
         // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm
         if (path == null
@@ -272,15 +300,15 @@ public class Directories
             // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
             SSTableDeletingTask.rescheduleFailedTasks();
             Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
-            path = getWriteableLocationAsFile();
+            path = getCompactionLocationAsFile();
         }
 
         return path;
     }
 
-    public File getWriteableLocationAsFile()
+    public File getCompactionLocationAsFile()
     {
-        return getLocationForDisk(getWriteableLocation());
+        return getLocationForDisk(getCompactionLocation());
     }
 
     /**
@@ -288,12 +316,12 @@ public class Directories
      *
      * @throws IOError if all directories are blacklisted.
      */
-    public DataDirectory getWriteableLocation()
+    public DataDirectory getCompactionLocation()
     {
         List<DataDirectory> candidates = new ArrayList<>();
 
         // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
-        for (DataDirectory dataDir : dataFileLocations)
+        for (DataDirectory dataDir : dataDirectories)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
                 continue;
@@ -318,6 +346,12 @@ public class Directories
         return candidates.get(0);
     }
 
+    public DataDirectory getFlushLocation()
+    {
+        return BlacklistedDirectories.isUnwritable(flushPath)
+               ? getCompactionLocation()
+               : flushDirectory;
+    }
 
     public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
     {
@@ -430,7 +464,7 @@ public class Directories
             if (filtered)
                 return;
 
-            for (File location : sstableDirectories)
+            for (File location : allSSTablePaths())
             {
                 if (BlacklistedDirectories.isUnreadable(location))
                     continue;
@@ -492,7 +526,7 @@ public class Directories
     public Map<String, Pair<Long, Long>> getSnapshotDetails()
     {
         final Map<String, Pair<Long, Long>> snapshotSpaceMap = new HashMap<>();
-        for (final File dir : sstableDirectories)
+        for (final File dir : allSSTablePaths())
         {
             final File snapshotDir = new File(dir,SNAPSHOT_SUBDIR);
             if (snapshotDir.exists() && snapshotDir.isDirectory())
@@ -522,7 +556,7 @@ public class Directories
     }
     public boolean snapshotExists(String snapshotName)
     {
-        for (File dir : sstableDirectories)
+        for (File dir : allSSTablePaths())
         {
             File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, snapshotName));
             if (snapshotDir.exists())
@@ -550,7 +584,7 @@ public class Directories
     // The snapshot must exist
     public long snapshotCreationTime(String snapshotName)
     {
-        for (File dir : sstableDirectories)
+        for (File dir : allSSTablePaths())
         {
             File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, snapshotName));
             if (snapshotDir.exists())
@@ -562,7 +596,7 @@ public class Directories
     public long trueSnapshotsSize()
     {
         long result = 0L;
-        for (File dir : sstableDirectories)
+        for (File dir : allSSTablePaths())
             result += getTrueAllocatedSizeIn(new File(dir, join(SNAPSHOT_SUBDIR)));
         return result;
     }
@@ -594,7 +628,7 @@ public class Directories
     public static List<File> getKSChildDirectories(String ksName)
     {
         List<File> result = new ArrayList<>();
-        for (DataDirectory dataDirectory : dataFileLocations)
+        for (DataDirectory dataDirectory : allSSTableDirectories())
         {
             File ksDir = new File(dataDirectory.location, ksName);
             File[] cfDirs = ksDir.listFiles();
@@ -612,7 +646,7 @@ public class Directories
     public List<File> getCFDirectories()
     {
         List<File> result = new ArrayList<>();
-        for (File dataDirectory : sstableDirectories)
+        for (File dataDirectory : allSSTablePaths())
         {
             if (dataDirectory.isDirectory())
                 result.add(dataDirectory);
@@ -640,19 +674,19 @@ public class Directories
         return StringUtils.join(s, File.separator);
     }
 
-    // Hack for tests, don't use otherwise
+    @VisibleForTesting
     static void overrideDataDirectoriesForTest(String loc)
     {
-        for (int i = 0; i < dataFileLocations.length; ++i)
-            dataFileLocations[i] = new DataDirectory(new File(loc));
+        for (int i = 0; i < dataDirectories.length; ++i)
+            dataDirectories[i] = new DataDirectory(new File(loc));
     }
 
-    // Hack for tests, don't use otherwise
+    @VisibleForTesting
     static void resetDataDirectoriesAfterTest()
     {
         String[] locations = DatabaseDescriptor.getAllDataFileLocations();
         for (int i = 0; i < locations.length; ++i)
-            dataFileLocations[i] = new DataDirectory(new File(locations[i]));
+            dataDirectories[i] = new DataDirectory(new File(locations[i]));
     }
     
     private class TrueFilesSizeVisitor extends SimpleFileVisitor<Path>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 9e76e6f..246639e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -304,6 +304,11 @@ public class Memtable
                                     * 1.2); // bloom filter and row index overhead
         }
 
+        protected Directories.DataDirectory getWriteableLocation()
+        {
+            return cfs.directories.getFlushLocation();
+        }
+
         public long getExpectedWriteSize()
         {
             return estimatedSize;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index a4c101f..f1fcbd1 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -49,6 +49,11 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable
             assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
     }
 
+    protected Directories.DataDirectory getWriteableLocation()
+    {
+        return cfs.directories.getCompactionLocation();
+    }
+
     /**
      * executes the task and unmarks sstables compacting
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6c09efb..bfffc8f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -626,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean
 
             logger.info("Cleaning up {}", sstable);
 
-            File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables();
+            File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables();
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
@@ -919,7 +919,7 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             logger.info("Anticompacting {}", sstable);
-            File destination = cfs.directories.getDirectoryForNewSSTables();
+            File destination = cfs.directories.getDirectoryForCompactedSSTables();
             SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
             SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e1da811..3a71136 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -78,7 +78,7 @@ public class Scrubber implements Closeable
         this.skipCorrupted = skipCorrupted;
 
         // Calculate the expected compacted filesize
-        this.destination = cfs.directories.getDirectoryForNewSSTables();
+        this.destination = cfs.directories.getDirectoryForCompactedSSTables();
         if (destination == null)
             throw new IOException("disk full");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..f0e2756 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -34,7 +34,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
         while (true)
         {
             writeSize = getExpectedWriteSize();
-            directory = getDirectories().getWriteableLocation();
+            directory = getWriteableLocation();
             if (directory != null || !reduceScopeForLimitedSpace())
                 break;
         }
@@ -54,6 +54,8 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
         }
     }
 
+    protected abstract Directories.DataDirectory getWriteableLocation();
+
     /**
      * Get sstable directories for the CF.
      * @return Directories instance for the CF.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index d805bf3..c26a61b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -111,7 +110,7 @@ public class StreamReader
 
     protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException
     {
-        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
+        Directories.DataDirectory localDir = cfs.directories.getCompactionLocation();
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
         desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index b4d5392..e5ef691 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
@@ -98,7 +97,7 @@ public class StreamReceiveTask extends StreamTask
             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-            StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID());
+            StreamLockfile lockfile = new StreamLockfile(cfs.directories.getCompactionLocationAsFile(), UUID.randomUUID());
             lockfile.create(task.sstables);
             List<SSTableReader> readers = new ArrayList<>();
             for (SSTableWriter writer : task.sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 5c71d9b..200925c 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -870,7 +870,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), "Keyspace2", "Standard1", version, false);
+            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, false);
             Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, false);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("can not find backedup file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
@@ -1576,7 +1576,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         ByteBuffer key = bytes("key");
 
         // 1st sstable
-        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner());
+        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForCompactedSSTables(), cfmeta, StorageService.getPartitioner());
         writer.newRow(key);
         writer.addColumn(bytes("col"), bytes("val"), 1);
         writer.close();
@@ -1588,7 +1588,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
 
         // simulate incomplete compaction
-        writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+        writer = new SSTableSimpleWriter(dir.getDirectoryForCompactedSSTables(),
                                          cfmeta, StorageService.getPartitioner())
         {
             protected SSTableWriter getWriter()
@@ -1643,7 +1643,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // Write SSTable generation 3 that has ancestors 1 and 2
         final Set<Integer> ancestors = Sets.newHashSet(1, 2);
-        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForCompactedSSTables(),
                                                 cfmeta, StorageService.getPartitioner())
         {
             protected SSTableWriter getWriter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index a9cd2a3..c831b29 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -116,7 +116,7 @@ public class DirectoriesTest
         for (CFMetaData cfm : CFM)
         {
             Directories directories = new Directories(cfm);
-            assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
+            assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
 
             Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
             File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
@@ -183,7 +183,7 @@ public class DirectoriesTest
         {
             DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.best_effort);
             
-            for (DataDirectory dd : Directories.dataFileLocations)
+            for (DataDirectory dd : Directories.dataDirectories)
             {
                 dd.location.setExecutable(false);
                 dd.location.setWritable(false);
@@ -199,7 +199,7 @@ public class DirectoriesTest
         } 
         finally 
         {
-            for (DataDirectory dd : Directories.dataFileLocations)
+            for (DataDirectory dd : Directories.dataDirectories)
             {
                 dd.location.setExecutable(true);
                 dd.location.setWritable(true);
@@ -215,7 +215,7 @@ public class DirectoriesTest
         for (final CFMetaData cfm : CFM)
         {
             final Directories directories = new Directories(cfm);
-            assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
+            assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
                 public File call() throws Exception {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f10148f4/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
index 15980a4..5995fe3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java
@@ -44,7 +44,7 @@ public class SSTableSimpleWriterTest extends SchemaLoader
         String cfname = "StandardInteger1";
 
         Keyspace t = Keyspace.open(keyspaceName); // make sure we create the directory
-        File dir = new Directories(Schema.instance.getCFMetaData(keyspaceName, cfname)).getDirectoryForNewSSTables();
+        File dir = new Directories(Schema.instance.getCFMetaData(keyspaceName, cfname)).getDirectoryForCompactedSSTables();
         assert dir.exists();
 
         IPartitioner partitioner = StorageService.getPartitioner();


[3/3] git commit: wip

Posted by jb...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f64b31c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f64b31c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f64b31c4

Branch: refs/heads/trunk
Commit: f64b31c426b53c264e65180e858f0c283cf5edb2
Parents: f10148f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Feb 15 10:18:12 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Feb 15 10:18:12 2014 -0600

----------------------------------------------------------------------
 build.xml                                       |   2 +
 conf/cassandra.yaml                             |   3 +
 lib/licenses/super-csv-2.1.0.txt                | 202 +++++++++++++++++++
 lib/super-csv-2.1.0.jar                         | Bin 0 -> 91473 bytes
 .../org/apache/cassandra/config/Config.java     |  48 ++++-
 .../cassandra/config/DatabaseDescriptor.java    |  40 +++-
 .../config/YamlConfigurationLoader.java         |   1 +
 .../apache/cassandra/service/StorageProxy.java  |  22 +-
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../org/apache/cassandra/tools/NodeProbe.java   |   5 +
 .../org/apache/cassandra/tools/NodeTool.java    |   9 +-
 11 files changed, 330 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 20f6a35..878de29 100644
--- a/build.xml
+++ b/build.xml
@@ -390,6 +390,7 @@
           <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+          <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
         </dependencyManagement>
         <developer id="alakshman" name="Avinash Lakshman"/>
         <developer id="antelder" name="Anthony Elder"/>
@@ -474,6 +475,7 @@
         <dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
         <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
         <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+        <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
 
         <dependency groupId="ch.qos.logback" artifactId="logback-core"/>
         <dependency groupId="ch.qos.logback" artifactId="logback-classic"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8538920..e6118d4 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -30,6 +30,9 @@ num_tokens: 256
 # initial_token:
 
 # See http://wiki.apache.org/cassandra/HintedHandoff
+# May either be "true" or "false" to enable globally, or contain a list
+# of data centers to enable per-datacenter.
+# hinted_handoff_enabled: DC1,DC2
 hinted_handoff_enabled: true
 # this defines the maximum amount of time a dead host will have hints
 # generated.  After it has been dead this long, new hints for it will not be

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/lib/licenses/super-csv-2.1.0.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/super-csv-2.1.0.txt b/lib/licenses/super-csv-2.1.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/super-csv-2.1.0.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/lib/super-csv-2.1.0.jar
----------------------------------------------------------------------
diff --git a/lib/super-csv-2.1.0.jar b/lib/super-csv-2.1.0.jar
new file mode 100644
index 0000000..6a85716
Binary files /dev/null and b/lib/super-csv-2.1.0.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 1f47171..65dcfd3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -17,8 +17,18 @@
  */
 package org.apache.cassandra.config;
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
 import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.NativeAllocator;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -38,7 +48,9 @@ public class Config
     public String partitioner;
 
     public Boolean auto_bootstrap = true;
-    public volatile Boolean hinted_handoff_enabled = true;
+    public volatile boolean hinted_handoff_enabled_global = true;
+    public String hinted_handoff_enabled;
+    public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
     public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour
 
     public SeedProviderDef seed_provider;
@@ -221,6 +233,40 @@ public class Config
         isClientMode = clientMode;
     }
 
+    public void configHintedHandoff() throws ConfigurationException
+    {
+        if (hinted_handoff_enabled != null && !hinted_handoff_enabled.isEmpty())
+        {
+            if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("true"))
+            {
+                hinted_handoff_enabled_global = true;
+            }
+            else if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("false"))
+            {
+                hinted_handoff_enabled_global = false;
+            }
+            else
+            {
+                try
+                {
+                    hinted_handoff_enabled_by_dc.addAll(parseHintedHandoffEnabledDCs(hinted_handoff_enabled));
+                }
+                catch (IOException e)
+                {
+                    throw new ConfigurationException("Invalid hinted_handoff_enabled parameter " + hinted_handoff_enabled, e);
+                }
+            }
+        }
+    }
+
+    private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE).surroundingSpacesNeedQuotes(true).build();
+
+    public static List<String> parseHintedHandoffEnabledDCs(final String dcNames) throws IOException
+    {
+        final CsvListReader csvListReader = new CsvListReader(new StringReader(dcNames), STANDARD_SURROUNDING_SPACES_NEED_QUOTES);
+        return csvListReader.read();
+    }
+
     public static enum CommitLogSync
     {
         periodic,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 29dece6..c7c91eb 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -1129,12 +1130,47 @@ public class DatabaseDescriptor
 
     public static void setHintedHandoffEnabled(boolean hintedHandoffEnabled)
     {
-        conf.hinted_handoff_enabled = hintedHandoffEnabled;
+        conf.hinted_handoff_enabled_global = hintedHandoffEnabled;
+        conf.hinted_handoff_enabled_by_dc.clear();
+    }
+
+    public static void setHintedHandoffEnabled(final String dcNames)
+    {
+        List<String> dcNameList;
+        try
+        {
+            dcNameList = Config.parseHintedHandoffEnabledDCs(dcNames);
+        }
+        catch (IOException e)
+        {
+            throw new IllegalArgumentException("Could not read csv of dcs for hinted handoff enable. " + dcNames, e);
+        }
+
+        if (dcNameList.isEmpty())
+            throw new IllegalArgumentException("Empty list of Dcs for hinted handoff enable");
+
+        conf.hinted_handoff_enabled_by_dc.clear();
+        conf.hinted_handoff_enabled_by_dc.addAll(dcNameList);
     }
 
     public static boolean hintedHandoffEnabled()
     {
-        return conf.hinted_handoff_enabled;
+        return conf.hinted_handoff_enabled_global;
+    }
+
+    public static Set<String> hintedHandoffEnabledByDC()
+    {
+        return Collections.unmodifiableSet(conf.hinted_handoff_enabled_by_dc);
+    }
+
+    public static boolean shouldHintByDC()
+    {
+        return !conf.hinted_handoff_enabled_by_dc.isEmpty();
+    }
+
+    public static boolean hintedHandoffEnabled(final String dcName)
+    {
+        return conf.hinted_handoff_enabled_by_dc.contains(dcName);
     }
 
     public static void setMaxHintWindow(int ms)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index aefc431..4a1280c 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -99,6 +99,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
             constructor.setPropertyUtils(propertiesChecker);
             Yaml yaml = new Yaml(constructor);
             Config result = yaml.loadAs(new ByteArrayInputStream(configBytes), Config.class);
+            result.configHintedHandoff();
             propertiesChecker.check();
             return result;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 05fdd61..5a51838 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1843,11 +1843,21 @@ public class StorageProxy implements StorageProxyMBean
         return DatabaseDescriptor.hintedHandoffEnabled();
     }
 
+    public Set<String> getHintedHandoffEnabledByDC()
+    {
+        return DatabaseDescriptor.hintedHandoffEnabledByDC();
+    }
+
     public void setHintedHandoffEnabled(boolean b)
     {
         DatabaseDescriptor.setHintedHandoffEnabled(b);
     }
 
+    public void setHintedHandoffEnabledByDCList(String dcNames)
+    {
+        DatabaseDescriptor.setHintedHandoffEnabled(dcNames);
+    }
+
     public int getMaxHintWindow()
     {
         return DatabaseDescriptor.getMaxHintWindow();
@@ -1860,7 +1870,17 @@ public class StorageProxy implements StorageProxyMBean
 
     public static boolean shouldHint(InetAddress ep)
     {
-        if (!DatabaseDescriptor.hintedHandoffEnabled())
+        if (DatabaseDescriptor.shouldHintByDC())
+        {
+            final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
+            //Disable DC specific hints
+            if(!DatabaseDescriptor.hintedHandoffEnabled(dc))
+            {
+                HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+                return false;
+            }
+        }
+        else if (!DatabaseDescriptor.hintedHandoffEnabled())
         {
             HintedHandOffManager.instance.metrics.incrPastWindow(ep);
             return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 24dd069..a04b660 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface StorageProxyMBean
 {
@@ -72,7 +73,9 @@ public interface StorageProxyMBean
 
     public long getTotalHints();
     public boolean getHintedHandoffEnabled();
+    public Set<String> getHintedHandoffEnabledByDC();
     public void setHintedHandoffEnabled(boolean b);
+    public void setHintedHandoffEnabledByDCList(String dcs);
     public int getMaxHintWindow();
     public void setMaxHintWindow(int ms);
     public int getMaxHintsInProgress();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 4df3ae2..78da62a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -745,6 +745,11 @@ public class NodeProbe implements AutoCloseable
         spProxy.setHintedHandoffEnabled(true);
     }
 
+    public void enableHintedHandoff(String dcNames)
+    {
+        spProxy.setHintedHandoffEnabledByDCList(dcNames);
+    }
+
     public void pauseHintsDelivery()
     {
         hhProxy.pauseHintsDelivery(true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f64b31c4/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 94bce74..453491b 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1220,10 +1220,17 @@ public class NodeTool
     @Command(name = "enablehandoff", description = "Reenable the future hints storing on the current node")
     public static class EnableHandoff extends NodeToolCmd
     {
+        @Arguments(usage = "<dc-name>,<dc-name>", description = "Enable hinted handoff only for these DCs")
+        private List<String> args = new ArrayList<>();
+
         @Override
         public void execute(NodeProbe probe)
         {
-            probe.enableHintedHandoff();
+            checkArgument(args.size() <= 1, "enablehandoff does not accept two args");
+            if(args.size() == 1)
+                probe.enableHintedHandoff(args.get(0));
+            else
+                probe.enableHintedHandoff();
         }
     }