You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by az...@apache.org on 2021/10/13 03:49:11 UTC

[cassandra] branch trunk updated: Expose information about stored hints via a nodetool command and a virtual table

This is an automated email from the ASF dual-hosted git repository.

azotcsit pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc27042  Expose information about stored hints via a nodetool command and a virtual table
fc27042 is described below

commit fc27042f61a6d78ec998a0186a5e97def90fd50a
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Fri Aug 6 15:46:50 2021 +0200

    Expose information about stored hints via a nodetool command and a virtual table
    
    Patch by Aleksandr Sorokoumov; reviewed by Ekaterina Dimitrova, Stefan Miklosovic and Aleksei Zotov for CASSANDRA-14795
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   2 +
 .../cassandra/db/virtual/PendingHintsTable.java    | 115 ++++++++++++++++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 .../cassandra/hints/HintsDispatchExecutor.java     |  36 +++++--
 .../org/apache/cassandra/hints/HintsService.java   |  26 +++++
 .../apache/cassandra/hints/HintsServiceMBean.java  |  10 ++
 .../org/apache/cassandra/hints/HintsStore.java     |  23 ++++
 .../apache/cassandra/hints/PendingHintsInfo.java   |  87 +++++++++++++++
 src/java/org/apache/cassandra/tools/NodeProbe.java |  15 +++
 src/java/org/apache/cassandra/tools/NodeTool.java  |   1 +
 .../cassandra/tools/nodetool/ListPendingHints.java |  98 +++++++++++++++++
 .../cassandra/hints/HintServiceBytemanTest.java    | 117 +++++++++++++++++++++
 .../apache/cassandra/hints/HintsServiceTest.java   |  98 ++---------------
 .../org/apache/cassandra/hints/HintsStoreTest.java |  23 +++-
 .../org/apache/cassandra/hints/HintsTestUtil.java  |  86 +++++++++++++++
 16 files changed, 639 insertions(+), 100 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 424781c..d910c80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795)
  * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
  * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337)
  * add credentials file support to CQLSH (CASSANDRA-16983)
diff --git a/NEWS.txt b/NEWS.txt
index 6121525..0bec628 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -56,6 +56,8 @@ Upgrading
       confirm it is set to value lower than 31 otherwise Cassandra will fail to start. See CASSANDRA-9384
       for further details. You also need to regenerate passwords for users for who the password
       was created while the above property was set to be more than 30 otherwise they will not be able to log in.
+    - Information about pending hints is now available through `nodetool listpendinghints` and `pending_hints` virtual
+      table.
 
 Deprecation
 -----------
diff --git a/src/java/org/apache/cassandra/db/virtual/PendingHintsTable.java b/src/java/org/apache/cassandra/db/virtual/PendingHintsTable.java
new file mode 100644
index 0000000..55d648c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/PendingHintsTable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.FailureDetectorMBean;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.hints.PendingHintsInfo;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+
+final class PendingHintsTable extends AbstractVirtualTable
+{
+    private static final String HOST_ID = "host_id";
+    private static final String ADDRESS = "address";
+    private static final String PORT = "port";
+    private static final String RACK = "rack";
+    private static final String DC = "dc";
+    private static final String STATUS = "status";
+    private static final String FILES = "files";
+    private static final String NEWEST = "newest";
+    private static final String OLDEST = "oldest";
+
+    PendingHintsTable(String keyspace)
+    {
+        super(TableMetadata.builder(keyspace, "pending_hints")
+                           .comment("Pending hints that this node has for other nodes")
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new LocalPartitioner(UUIDType.instance))
+                           .addPartitionKeyColumn(HOST_ID, UUIDType.instance)
+                           .addRegularColumn(ADDRESS, InetAddressType.instance)
+                           .addRegularColumn(PORT, Int32Type.instance)
+                           .addRegularColumn(RACK, UTF8Type.instance)
+                           .addRegularColumn(DC, UTF8Type.instance)
+                           .addRegularColumn(STATUS, UTF8Type.instance)
+                           .addRegularColumn(FILES, Int32Type.instance)
+                           .addRegularColumn(NEWEST, TimestampType.instance)
+                           .addRegularColumn(OLDEST, TimestampType.instance)
+                           .build());
+    }
+
+    @Override
+    public DataSet data()
+    {
+        List<PendingHintsInfo> pendingHints = HintsService.instance.getPendingHintsInfo();
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+
+        SimpleDataSet result = new SimpleDataSet(metadata());
+
+        Map<String, String> simpleStates;
+        if (FailureDetector.instance instanceof FailureDetectorMBean)
+            simpleStates = ((FailureDetectorMBean) FailureDetector.instance).getSimpleStatesWithPort();
+        else
+            simpleStates = Collections.emptyMap();
+
+        for (PendingHintsInfo info : pendingHints)
+        {
+            InetAddressAndPort addressAndPort = StorageService.instance.getEndpointForHostId(info.hostId);
+            InetAddress address = null;
+            Integer port = null;
+            String rack = "Unknown";
+            String dc = "Unknown";
+            String status = "Unknown";
+            if (addressAndPort != null)
+            {
+                address = addressAndPort.getAddress();
+                port = addressAndPort.getPort();
+                rack = snitch.getRack(addressAndPort);
+                dc = snitch.getDatacenter(addressAndPort);
+                status = simpleStates.getOrDefault(addressAndPort.toString(), status);
+            }
+            result.row(info.hostId)
+                  .column(ADDRESS, address)
+                  .column(PORT, port)
+                  .column(RACK, rack)
+                  .column(DC, dc)
+                  .column(STATUS, status)
+                  .column(FILES, info.totalFiles)
+                  .column(NEWEST, new Date(info.newestTimestamp))
+                  .column(OLDEST, new Date(info.oldestTimestamp));
+        }
+        return result;
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 92da4af..77773a7 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -36,6 +36,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new ThreadPoolsTable(VIRTUAL_VIEWS))
                     .add(new InternodeOutboundTable(VIRTUAL_VIEWS))
                     .add(new InternodeInboundTable(VIRTUAL_VIEWS))
+                    .add(new PendingHintsTable(VIRTUAL_VIEWS))
                     .addAll(TableMetricTables.getAll(VIRTUAL_VIEWS))
                     .build());
     }
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index b6de749..7f70b21 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -285,23 +285,39 @@ final class HintsDispatchExecutor
                 if (offset != null)
                     dispatcher.seek(offset);
 
-                if (dispatcher.dispatch())
+                try
                 {
-                    store.delete(descriptor);
-                    store.cleanUp(descriptor);
-                    logger.info("Finished hinted handoff of file {} to endpoint {}: {}", descriptor.fileName(), address, hostId);
-                    return true;
+                    if (dispatcher.dispatch())
+                    {
+                        store.delete(descriptor);
+                        store.cleanUp(descriptor);
+                        logger.info("Finished hinted handoff of file {} to endpoint {}: {}", descriptor.fileName(), address, hostId);
+                        return true;
+                    }
+                    else
+                    {
+                        handleDispatchFailure(dispatcher, descriptor, address);
+                        return false;
+                    }
                 }
-                else
+                // we wrap InterruptedException in UncheckedInterruptedException
+                // without that catch, undispatched HintsDescriptor won't be added back to the store and cleaned
+                // up by HintsStore.delete in tests
+                catch (UncheckedInterruptedException e)
                 {
-                    store.markDispatchOffset(descriptor, dispatcher.dispatchPosition());
-                    store.offerFirst(descriptor);
-                    logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", descriptor.fileName(), address, hostId);
-                    return false;
+                    handleDispatchFailure(dispatcher, descriptor, address);
+                    throw e;
                 }
             }
         }
 
+        private void handleDispatchFailure(HintsDispatcher dispatcher, HintsDescriptor descriptor, InetAddressAndPort address)
+        {
+            store.markDispatchOffset(descriptor, dispatcher.dispatchPosition());
+            store.offerFirst(descriptor);
+            logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", descriptor.fileName(), address, hostId);
+        }
+
         // for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica
         private void convert(HintsDescriptor descriptor)
         {
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 8108b50..3fcc00e 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
@@ -273,6 +274,31 @@ public final class HintsService implements HintsServiceMBean
     }
 
     /**
+     * Returns all pending hints that this node has.
+     *
+     * @return a list of {@link PendingHintsInfo}
+     */
+    public List<PendingHintsInfo> getPendingHintsInfo()
+    {
+        return catalog.stores()
+                      .filter(HintsStore::hasFiles)
+                      .map(HintsStore::getPendingHintsInfo)
+                      .collect(Collectors.toList());
+    }
+
+    /**
+     * Returns all pending hints that this node has.
+     *
+     * @return a list of maps with endpoints' ids, total number of hint files, their oldest and newest timestamps.
+     */
+    public List<Map<String, String>> getPendingHints()
+    {
+        return getPendingHintsInfo().stream()
+                                    .map(PendingHintsInfo::asMap)
+                                    .collect(Collectors.toList());
+    }
+
+    /**
      * Deletes all hints for all destinations. Doesn't make snapshots - should be used with care.
      */
     public void deleteAllHints()
diff --git a/src/java/org/apache/cassandra/hints/HintsServiceMBean.java b/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
index fe0abcc..7fd7695 100644
--- a/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
+++ b/src/java/org/apache/cassandra/hints/HintsServiceMBean.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.hints;
 
+import java.util.List;
+import java.util.Map;
+
 public interface HintsServiceMBean
 {
     /**
@@ -40,4 +43,11 @@ public interface HintsServiceMBean
      * being dispatched right now, or being written to).
      */
     void deleteAllHintsForEndpoint(String address);
+
+    /**
+     * Returns all pending hints that this node has.
+     *
+     * @return a list of endpoints with relevant hint information - total number of files, newest and oldest timestamps.
+     */
+    List<Map<String, String>> getPendingHints();
 }
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index 02a1699..29e79bf 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -23,6 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
@@ -102,6 +105,26 @@ final class HintsStore
         return StorageService.instance.getEndpointForHostId(hostId);
     }
 
+    @Nullable
+    PendingHintsInfo getPendingHintsInfo()
+    {
+        Iterator<HintsDescriptor> descriptors = dispatchDequeue.iterator();
+        int queueSize = 0;
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+        while (descriptors.hasNext())
+        {
+            HintsDescriptor descriptor = descriptors.next();
+            minTimestamp = Math.min(minTimestamp, descriptor.timestamp);
+            maxTimestamp = Math.max(maxTimestamp, descriptor.timestamp);
+            queueSize++;
+        }
+
+        if (queueSize == 0)
+            return null;
+        return new PendingHintsInfo(hostId, queueSize, minTimestamp, maxTimestamp);
+    }
+
     boolean isLive()
     {
         InetAddressAndPort address = address();
diff --git a/src/java/org/apache/cassandra/hints/PendingHintsInfo.java b/src/java/org/apache/cassandra/hints/PendingHintsInfo.java
new file mode 100644
index 0000000..9fbae16
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/PendingHintsInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import com.google.common.base.MoreObjects;
+
+public class PendingHintsInfo
+{
+    public static final String HOST_ID = "host_id";
+    public static final String TOTAL_FILES = "total_files";
+    public static final String OLDEST_TIMESTAMP = "oldest_timestamp";
+    public static final String NEWEST_TIMESTAMP = "newest_timestamp";
+
+    public final UUID hostId;
+    public final int totalFiles;
+    public final long oldestTimestamp;
+    public final long newestTimestamp;
+
+    public PendingHintsInfo(UUID hostId, int totalFiles, long oldestTimestamp, long newestTimestamp)
+    {
+        this.hostId = hostId;
+        this.totalFiles = totalFiles;
+        this.oldestTimestamp = oldestTimestamp;
+        this.newestTimestamp = newestTimestamp;
+    }
+
+    public Map<String, String> asMap()
+    {
+        Map<String, String> ret = new HashMap<>();
+        ret.put(HOST_ID, hostId.toString());
+        ret.put(TOTAL_FILES, String.valueOf(totalFiles));
+        ret.put(OLDEST_TIMESTAMP, String.valueOf(oldestTimestamp));
+        ret.put(NEWEST_TIMESTAMP, String.valueOf(newestTimestamp));
+        return ret;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PendingHintsInfo that = (PendingHintsInfo) o;
+        return totalFiles == that.totalFiles &&
+               oldestTimestamp == that.oldestTimestamp &&
+               newestTimestamp == that.newestTimestamp &&
+               Objects.equals(hostId, that.hostId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(hostId, totalFiles, oldestTimestamp, newestTimestamp);
+    }
+
+    @Override
+    public String toString()
+    {
+        return MoreObjects.toStringHelper(this)
+                          .add("hostId", hostId)
+                          .add("totalFiles", totalFiles)
+                          .add("oldestTimestamp", oldestTimestamp)
+                          .add("newestTimestamp", newestTimestamp)
+                          .toString();
+    }
+}
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 2f65e58..ae50a68 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -686,6 +686,11 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
+    public Map<String, String> getHostIdToEndpointWithPort()
+    {
+        return ssProxy.getHostIdToEndpointWithPort();
+    }
+
     public String getLocalHostId()
     {
         return ssProxy.getLocalHostId();
@@ -1140,6 +1145,11 @@ public class NodeProbe implements AutoCloseable
         hsProxy.deleteAllHints();
     }
 
+    public List<Map<String, String>> listPendingHints()
+    {
+        return hsProxy.getPendingHints();
+    }
+
     public void refreshSizeEstimates()
     {
         try
@@ -1312,6 +1322,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.rebuildSecondaryIndex(ksName, cfName, idxNames);
     }
 
+    public Map<String, String> getSimpleStatesWithPort()
+    {
+        return fdProxy.getSimpleStatesWithPort();
+    }
+
     public String getGossipInfo(boolean withPort)
     {
         return withPort ? fdProxy.getAllEndpointStatesWithPort() : fdProxy.getAllEndpointStates();
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 7d3a3ca..63e6eb6 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -159,6 +159,7 @@ public class NodeTool
                 InvalidateRolesCache.class,
                 InvalidateRowCache.class,
                 Join.class,
+                ListPendingHints.class,
                 ListSnapshots.class,
                 Move.class,
                 NetStats.class,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/ListPendingHints.java b/src/java/org/apache/cassandra/tools/nodetool/ListPendingHints.java
new file mode 100644
index 0000000..af414d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/ListPendingHints.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.net.UnknownHostException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Map;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.hints.PendingHintsInfo;
+import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+
+@Command(name = "listpendinghints", description = "Print all pending hints that this node has")
+public class ListPendingHints extends NodeTool.NodeToolCmd
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        List<Map<String, String>> pendingHints = probe.listPendingHints();
+        if(pendingHints.isEmpty())
+        {
+            probe.output().out.println("This node does not have any pending hints");
+        }
+        else
+        {
+            Map<String, String> endpointMap = probe.getHostIdToEndpointWithPort();
+            Map<String, String> simpleStates = probe.getSimpleStatesWithPort();
+            EndpointSnitchInfoMBean epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS");
+            TableBuilder tableBuilder = new TableBuilder();
+
+            tableBuilder.add("Host ID", "Address", "Rack", "DC", "Status", "Total files", "Newest", "Oldest");
+            for (Map<String, String> hintInfo : pendingHints)
+            {
+                String endpoint = hintInfo.get(PendingHintsInfo.HOST_ID);
+                String totalFiles = hintInfo.get(PendingHintsInfo.TOTAL_FILES);
+                LocalDateTime newest = Instant.ofEpochMilli(Long.parseLong(hintInfo.get(PendingHintsInfo.NEWEST_TIMESTAMP)))
+                                              .atZone(ZoneId.of("UTC"))
+                                              .toLocalDateTime();
+                LocalDateTime oldest = Instant.ofEpochMilli(Long.parseLong(hintInfo.get(PendingHintsInfo.OLDEST_TIMESTAMP)))
+                                              .atZone(ZoneId.of("UTC"))
+                                              .toLocalDateTime();
+                String address = endpointMap.get(endpoint);
+                String rack = null;
+                String dc = null;
+                String status = null;
+                try
+                {
+                    rack = epSnitchInfo.getRack(address);
+                    dc = epSnitchInfo.getDatacenter(address);
+                    status = simpleStates.getOrDefault(InetAddressAndPort.getByName(address).toString(),
+                                                       "Unknown");
+                }
+                catch (UnknownHostException e)
+                {
+                    rack = rack != null ? rack : "Unknown";
+                    dc = dc != null ? dc : "Unknown";
+                    status = "Unknown";
+                }
+
+                tableBuilder.add(endpoint,
+                                 address,
+                                 rack,
+                                 dc,
+                                 status,
+                                 String.valueOf(totalFiles),
+                                 dtf.format(newest),
+                                 dtf.format(oldest));
+            }
+            tableBuilder.printTo(probe.output().out);
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java b/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java
new file mode 100644
index 0000000..b7f431d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MockMessagingService;
+import org.apache.cassandra.net.MockMessagingSpy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.awaitility.Awaitility;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.hints.HintsTestUtil.MockFailureDetector;
+import static org.apache.cassandra.hints.HintsTestUtil.sendHintsAndResponses;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(BMUnitRunner.class)
+public class HintServiceBytemanTest
+{
+    private static final String KEYSPACE = "hints_service_test";
+    private static final String TABLE = "table";
+
+    private final MockFailureDetector failureDetector = new HintsTestUtil.MockFailureDetector();
+    private static TableMetadata metadata;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+    }
+
+    @After
+    public void cleanup()
+    {
+        MockMessagingService.cleanup();
+    }
+
+    @Before
+    public void reinstanciateService() throws Throwable
+    {
+        MessagingService.instance().inboundSink.clear();
+        MessagingService.instance().outboundSink.clear();
+
+        if (!HintsService.instance.isShutDown())
+        {
+            HintsService.instance.shutdownBlocking();
+            HintsService.instance.deleteAllHints();
+        }
+
+        failureDetector.isAlive = true;
+
+        HintsService.instance = new HintsService(failureDetector);
+
+        HintsService.instance.startDispatch();
+    }
+
+    @Test
+    @BMRule(name = "Delay delivering hints",
+    targetClass = "DispatchHintsTask",
+    targetMethod = "run",
+    action = "Thread.sleep(DatabaseDescriptor.getHintsFlushPeriodInMS() * 3L)")
+    public void testListPendingHints() throws InterruptedException, ExecutionException
+    {
+        HintsService.instance.resumeDispatch();
+        MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, -1);
+        Awaitility.await("For the hints file to flush")
+                  .atMost(Duration.ofMillis(DatabaseDescriptor.getHintsFlushPeriodInMS() * 2L))
+                  .until(() -> !HintsService.instance.getPendingHints().isEmpty());
+
+        List<PendingHintsInfo> pendingHints = HintsService.instance.getPendingHintsInfo();
+        assertEquals(1, pendingHints.size());
+        PendingHintsInfo info = pendingHints.get(0);
+        assertEquals(StorageService.instance.getLocalHostUUID(), info.hostId);
+        assertEquals(1, info.totalFiles);
+        assertEquals(info.oldestTimestamp, info.newestTimestamp); // there is 1 descriptor with only 1 timestamp
+
+        spy.interceptMessageOut(20000).get();
+        assertEquals(Collections.emptyList(), HintsService.instance.getPendingHints());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index dddf336..dd0eb5a 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.hints;
 
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -33,26 +32,17 @@ import org.junit.Test;
 
 import com.datastax.driver.core.utils.MoreFutures;
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.NoPayload;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MockMessagingService;
 import org.apache.cassandra.net.MockMessagingSpy;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 
-import static org.apache.cassandra.Util.dk;
-import static org.apache.cassandra.net.Verb.HINT_REQ;
-import static org.apache.cassandra.net.Verb.HINT_RSP;
-import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.apache.cassandra.hints.HintsTestUtil.MockFailureDetector;
+import static org.apache.cassandra.hints.HintsTestUtil.sendHintsAndResponses;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -62,6 +52,7 @@ public class HintsServiceTest
     private static final String TABLE = "table";
 
     private final MockFailureDetector failureDetector = new MockFailureDetector();
+    private static TableMetadata metadata;
 
     @BeforeClass
     public static void defineSchema()
@@ -71,6 +62,7 @@ public class HintsServiceTest
         SchemaLoader.createKeyspace(KEYSPACE,
                 KeyspaceParams.simple(1),
                 SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+        metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
     }
 
     @After
@@ -104,7 +96,7 @@ public class HintsServiceTest
         long cnt = StorageMetrics.totalHints.getCount();
 
         // create spy for hint messages
-        MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+        MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
 
         // metrics should have been updated with number of create hints
         assertEquals(cnt + 100, StorageMetrics.totalHints.getCount());
@@ -120,7 +112,7 @@ public class HintsServiceTest
         HintsService.instance.pauseDispatch();
 
         // create spy for hint messages
-        MockMessagingSpy spy = sendHintsAndResponses(100, -1);
+        MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
 
         // we should not send any hints while paused
         ListenableFuture<Boolean> noMessagesWhilePaused = spy.interceptNoMsg(15, TimeUnit.SECONDS);
@@ -143,7 +135,7 @@ public class HintsServiceTest
     public void testPageRetry() throws InterruptedException, ExecutionException, TimeoutException
     {
         // create spy for hint messages, but only create responses for 5 hints
-        MockMessagingSpy spy = sendHintsAndResponses(20, 5);
+        MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5);
 
         Futures.allAsList(
                 // the dispatcher will always send all hints within the current page
@@ -164,7 +156,7 @@ public class HintsServiceTest
     public void testPageSeek() throws InterruptedException, ExecutionException
     {
         // create spy for hint messages, stop replying after 12k (should be on 3rd page)
-        MockMessagingSpy spy = sendHintsAndResponses(20000, 12000);
+        MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, 12000);
 
         // At this point the dispatcher will constantly retry the page we stopped acking,
         // thus we receive the same hints from the page multiple times and in total more than
@@ -181,74 +173,4 @@ public class HintsServiceTest
         assertTrue(dispatchOffset != null);
         assertTrue(((ChecksummedDataInput.Position) dispatchOffset).sourcePosition > 0);
     }
-
-    private MockMessagingSpy sendHintsAndResponses(int noOfHints, int noOfResponses)
-    {
-        // create spy for hint messages, but only create responses for noOfResponses hints
-        Message<NoPayload> message = Message.internalResponse(HINT_RSP, NoPayload.noPayload);
-
-        MockMessagingSpy spy;
-        if (noOfResponses != -1)
-        {
-            spy = MockMessagingService.when(verb(HINT_REQ)).respondN(message, noOfResponses);
-        }
-        else
-        {
-            spy = MockMessagingService.when(verb(HINT_REQ)).respond(message);
-        }
-
-        // create and write noOfHints using service
-        UUID hostId = StorageService.instance.getLocalHostUUID();
-        for (int i = 0; i < noOfHints; i++)
-        {
-            long now = System.currentTimeMillis();
-            DecoratedKey dkey = dk(String.valueOf(i));
-            TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
-            PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(metadata, dkey).timestamp(now);
-            builder.row("column0").add("val", "value0");
-            Hint hint = Hint.create(builder.buildAsMutation(), now);
-            HintsService.instance.write(hostId, hint);
-        }
-        return spy;
-    }
-
-    private static class MockFailureDetector implements IFailureDetector
-    {
-        private boolean isAlive = true;
-
-        public boolean isAlive(InetAddressAndPort ep)
-        {
-            return isAlive;
-        }
-
-        public void interpret(InetAddressAndPort ep)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void report(InetAddressAndPort ep)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void remove(InetAddressAndPort ep)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void forceConviction(InetAddressAndPort ep)
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
 }
diff --git a/test/unit/org/apache/cassandra/hints/HintsStoreTest.java b/test/unit/org/apache/cassandra/hints/HintsStoreTest.java
index c9a0d57..c184778 100644
--- a/test/unit/org/apache/cassandra/hints/HintsStoreTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsStoreTest.java
@@ -107,12 +107,13 @@ public class HintsStoreTest
 
     /**
      * Test multiple threads delete hints files.
-     * It could happens when hint services is running a removal process, meanwhile operator issues a NodeTool command to delete.
+     * It could happen when hint service is running a removal process, meanwhile operator issues a NodeTool command to delete.
      *
      * Thread contends and delete part of the files in the store. The final effect should all files get deleted.
      */
     @Test
-    public void testConcurrentDeleteExpiredHints() throws Exception {
+    public void testConcurrentDeleteExpiredHints() throws Exception
+    {
         final long now = System.currentTimeMillis();
         for (int i = 100; i >= 0; i--)
         {
@@ -137,6 +138,24 @@ public class HintsStoreTest
         assertFalse("All hints files should be deleted", store.hasFiles());
     }
 
+    @Test
+    public void testPendingHintsInfo() throws Exception
+    {
+        HintsStore store = HintsCatalog.load(directory, ImmutableMap.of()).get(hostId);
+        assertNull(store.getPendingHintsInfo());
+
+        final long t1 = 10;
+        writeHints(directory, new HintsDescriptor(hostId, t1), 100, t1);
+        store = HintsCatalog.load(directory, ImmutableMap.of()).get(hostId);
+        assertEquals(new PendingHintsInfo(store.hostId, 1, t1, t1),
+                     store.getPendingHintsInfo());
+        final long t2 = t1 + 1;
+        writeHints(directory, new HintsDescriptor(hostId, t2), 100, t2);
+        store = HintsCatalog.load(directory, ImmutableMap.of()).get(hostId);
+        assertEquals(new PendingHintsInfo(store.hostId, 2, t1, t2),
+                     store.getPendingHintsInfo());
+    }
+
     private long writeHints(File directory, HintsDescriptor descriptor, int hintsCount, long hintCreationTime) throws IOException
     {
         try (HintsWriter writer = HintsWriter.create(directory, descriptor))
diff --git a/test/unit/org/apache/cassandra/hints/HintsTestUtil.java b/test/unit/org/apache/cassandra/hints/HintsTestUtil.java
index c1c6192..6c95f3e 100644
--- a/test/unit/org/apache/cassandra/hints/HintsTestUtil.java
+++ b/test/unit/org/apache/cassandra/hints/HintsTestUtil.java
@@ -17,13 +17,30 @@
  */
 package org.apache.cassandra.hints;
 
+import java.util.UUID;
+
 import com.google.common.collect.Iterators;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.partitions.AbstractBTreePartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MockMessagingService;
+import org.apache.cassandra.net.MockMessagingSpy;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.apache.cassandra.net.Verb.HINT_REQ;
+import static org.apache.cassandra.net.Verb.HINT_RSP;
 
 final class HintsTestUtil
 {
@@ -45,4 +62,73 @@ final class HintsTestUtil
         assertEquals(expected.creationTime, actual.creationTime);
         assertEquals(expected.gcgs, actual.gcgs);
     }
+
+    static MockMessagingSpy sendHintsAndResponses(TableMetadata metadata, int noOfHints, int noOfResponses)
+    {
+        // create spy for hint messages, but only create responses for noOfResponses hints
+        Message<NoPayload> message = Message.internalResponse(HINT_RSP, NoPayload.noPayload);
+
+        MockMessagingSpy spy;
+        if (noOfResponses != -1)
+        {
+            spy = MockMessagingService.when(verb(HINT_REQ)).respondN(message, noOfResponses);
+        }
+        else
+        {
+            spy = MockMessagingService.when(verb(HINT_REQ)).respond(message);
+        }
+
+        // create and write noOfHints using service
+        UUID hostId = StorageService.instance.getLocalHostUUID();
+        for (int i = 0; i < noOfHints; i++)
+        {
+            long now = Clock.Global.currentTimeMillis();
+            DecoratedKey dkey = dk(String.valueOf(i));
+            PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(metadata, dkey).timestamp(now);
+            builder.row("column0").add("val", "value0");
+            Hint hint = Hint.create(builder.buildAsMutation(), now);
+            HintsService.instance.write(hostId, hint);
+        }
+        return spy;
+    }
+
+    static class MockFailureDetector implements IFailureDetector
+    {
+        boolean isAlive = true;
+
+        public boolean isAlive(InetAddressAndPort ep)
+        {
+            return isAlive;
+        }
+
+        public void interpret(InetAddressAndPort ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void report(InetAddressAndPort ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void remove(InetAddressAndPort ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void forceConviction(InetAddressAndPort ep)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org