You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:22:00 UTC

[10/17] cassandra git commit: Merge commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d' into cassandra-3.0

Merge commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d' into cassandra-3.0

* commit '6555a87bde4daeb8bd5d9558595a367ec6bc061d':
  Avoid stalling Paxos when the paxos state expires


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70059726
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70059726
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70059726

Branch: refs/heads/trunk
Commit: 70059726f08a98ea21af91ce3855bf62f6f4b652
Parents: cb4540e 6555a87
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jun 28 15:19:08 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:19:57 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 .../org/apache/cassandra/cql3/QueryProcessor.java | 17 ++++++++++++++++-
 .../cql3/statements/SelectStatement.java          |  6 +++++-
 .../org/apache/cassandra/db/SystemKeyspace.java   | 12 ++++++------
 .../apache/cassandra/service/StorageProxy.java    |  4 +++-
 .../cassandra/service/paxos/PaxosState.java       | 11 +++++++++--
 .../cassandra/service/paxos/PrepareCallback.java  | 18 +++++++++++++++++-
 src/java/org/apache/cassandra/utils/UUIDGen.java  | 11 +++++++++++
 8 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 314a93e,9f42d98..aaeafd6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -9,38 -2,9 +9,39 @@@ Merged from 2.2
   * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
   * Validate bloom_filter_fp_chance against lowest supported
     value when the table is created (CASSANDRA-11920)
 - * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
   * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
   * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 +Merged from 2.1:
++ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
 + * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
 + * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
 + * Avoid marking too many sstables as repaired (CASSANDRA-11696)
 + * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
 + * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
 + * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842)
 + * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
 + * Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
 + * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
 + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
 +
 +
 +3.0.7
 + * Fix legacy serialization of Thrift-generated non-compound range tombstones
 +   when communicating with 2.x nodes (CASSANDRA-11930)
 + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
 + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
 + * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
 + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
 + * Remove unneeded code to repair index summaries that have
 +   been improperly down-sampled (CASSANDRA-11127)
 + * Avoid WriteTimeoutExceptions during commit log replay due to materialized
 +   view lock contention (CASSANDRA-11891)
 + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
   * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
   * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index da146ef,c702679..af94d3e
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -273,10 -288,10 +273,10 @@@ public class QueryProcessor implements 
              AbstractType type = prepared.boundNames.get(i).type;
              boundValues.add(value instanceof ByteBuffer || value == null ? (ByteBuffer)value : type.decompose(value));
          }
 -        return QueryOptions.forInternalCalls(boundValues);
 +        return QueryOptions.forInternalCalls(cl, boundValues);
      }
  
-     private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+     public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
      {
          ParsedStatement.Prepared prepared = internalStatements.get(query);
          if (prepared != null)
@@@ -343,19 -340,42 +343,34 @@@
              return null;
      }
  
+     /**
+      * A special version of executeInternal that takes the time used as "now" for the query in argument.
+      * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+      * cases.
+      */
 -    public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values)
++    public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values)
+     {
 -        try
 -        {
 -            ParsedStatement.Prepared prepared = prepareInternal(query);
 -            assert prepared.statement instanceof SelectStatement;
 -            SelectStatement select = (SelectStatement)prepared.statement;
 -            ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now);
 -            assert result instanceof ResultMessage.Rows;
 -            return UntypedResultSet.create(((ResultMessage.Rows)result).result);
 -        }
 -        catch (RequestExecutionException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        catch (RequestValidationException e)
 -        {
 -            throw new RuntimeException("Error validating query " + query, e);
 -        }
++        ParsedStatement.Prepared prepared = prepareInternal(query);
++        assert prepared.statement instanceof SelectStatement;
++        SelectStatement select = (SelectStatement)prepared.statement;
++        ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec);
++        assert result instanceof ResultMessage.Rows;
++        return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+     }
+ 
 -    public static UntypedResultSet resultify(String query, Row row)
 +    public static UntypedResultSet resultify(String query, RowIterator partition)
      {
 -        return resultify(query, Collections.singletonList(row));
 +        return resultify(query, PartitionIterators.singletonIterator(partition));
      }
  
 -    public static UntypedResultSet resultify(String query, List<Row> rows)
 +    public static UntypedResultSet resultify(String query, PartitionIterator partitions)
      {
 -        SelectStatement ss = (SelectStatement) getStatement(query, null).statement;
 -        ResultSet cqlRows = ss.process(rows);
 -        return UntypedResultSet.create(cqlRows);
 +        try (PartitionIterator iter = partitions)
 +        {
 +            SelectStatement ss = (SelectStatement) getStatement(query, null).statement;
 +            ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds());
 +            return UntypedResultSet.create(cqlRows);
 +        }
      }
  
      public ResultMessage.Prepared prepare(String query,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 0e33475,8820ff7..aca6146
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -401,33 -280,45 +401,37 @@@ public class SelectStatement implement
          return new ResultMessage.Rows(rset);
      }
  
 -    static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds)
 -    {
 -        Keyspace keyspace = Keyspace.open(keyspaceName);
 -        List<Row> rows = new ArrayList<Row>(cmds.size());
 -        for (ReadCommand cmd : cmds)
 -            rows.add(cmd.getRow(keyspace));
 -        return rows;
 -    }
 -
      public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
      {
-         int nowInSec = FBUtilities.nowInSeconds();
 -        return executeInternal(state, options, System.currentTimeMillis());
++        return executeInternal(state, options, FBUtilities.nowInSeconds());
+     }
+ 
 -    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException
++    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException
+     {
 -        int limit = getLimit(options);
 -        Pageable command = getPageableCommand(options, limit, now);
 +        int userLimit = getLimit(options);
 +        ReadQuery query = getQuery(options, nowInSec, userLimit);
          int pageSize = getPageSize(options);
  
 -        if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
 +        try (ReadOrderGroup orderGroup = query.startOrderGroup())
          {
 -            List<Row> rows = command == null
 -                             ? Collections.<Row>emptyList()
 -                             : (command instanceof Pageable.ReadCommands
 -                                ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
 -                                : ((RangeSliceCommand)command).executeLocally());
 -
 -            return processResults(rows, options, limit, now);
 +            if (pageSize <= 0 || query.limits().count() <= pageSize)
 +            {
 +                try (PartitionIterator data = query.executeInternal(orderGroup))
 +                {
 +                    return processResults(data, options, nowInSec, userLimit);
 +                }
 +            }
 +            else
 +            {
 +                QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
 +                return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec, userLimit);
 +            }
          }
 -
 -        QueryPager pager = QueryPagers.localPager(command);
 -        return execute(pager, options, limit, now, pageSize);
      }
  
 -    public ResultSet process(List<Row> rows) throws InvalidRequestException
 +    public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
      {
 -        QueryOptions options = QueryOptions.DEFAULT;
 -        return process(rows, options, getLimit(options), System.currentTimeMillis());
 +        return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
      }
  
      public String keyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index c5c6abe,e0d5f66..da96b38
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -1082,10 -894,10 +1082,10 @@@ public final class SystemKeyspac
          return null;
      }
  
-     public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata)
 -    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now)
++    public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec)
      {
          String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-         UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId);
 -        UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS), key, metadata.cfId);
++        UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId);
          if (results.isEmpty())
              return new PaxosState(key, metadata);
          UntypedResultSet.Row row = results.one();
@@@ -1110,43 -920,41 +1110,43 @@@
          String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
          executeInternal(String.format(req, PAXOS),
                          UUIDGen.microsTimestamp(promise.ballot),
-                         paxosTtl(promise.update.metadata()),
 -                        paxosTtl(promise.update.metadata),
++                        paxosTtlSec(promise.update.metadata()),
                          promise.ballot,
 -                        promise.key,
 -                        promise.update.id());
 +                        promise.update.partitionKey().getKey(),
 +                        promise.update.metadata().cfId);
      }
  
      public static void savePaxosProposal(Commit proposal)
      {
 -        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
 +        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
                          UUIDGen.microsTimestamp(proposal.ballot),
-                         paxosTtl(proposal.update.metadata()),
 -                        paxosTtl(proposal.update.metadata),
++                        paxosTtlSec(proposal.update.metadata()),
                          proposal.ballot,
 -                        proposal.update.toBytes(),
 -                        proposal.key,
 -                        proposal.update.id());
 +                        PartitionUpdate.toBytes(proposal.update, MessagingService.current_version),
 +                        MessagingService.current_version,
 +                        proposal.update.partitionKey().getKey(),
 +                        proposal.update.metadata().cfId);
      }
  
-     private static int paxosTtl(CFMetaData metadata)
 -    public static int paxosTtl(CFMetaData metadata)
++    public static int paxosTtlSec(CFMetaData metadata)
      {
          // keep paxos state around for at least 3h
 -        return Math.max(3 * 3600, metadata.getGcGraceSeconds());
 +        return Math.max(3 * 3600, metadata.params.gcGraceSeconds);
      }
  
      public static void savePaxosCommit(Commit commit)
      {
          // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
          // even though that's really just an optimization  since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
 -        String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
 +        String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?";
          executeInternal(String.format(cql, PAXOS),
                          UUIDGen.microsTimestamp(commit.ballot),
-                         paxosTtl(commit.update.metadata()),
 -                        paxosTtl(commit.update.metadata),
++                        paxosTtlSec(commit.update.metadata()),
                          commit.ballot,
 -                        commit.update.toBytes(),
 -                        commit.key,
 -                        commit.update.id());
 +                        PartitionUpdate.toBytes(commit.update, MessagingService.current_version),
 +                        MessagingService.current_version,
 +                        commit.update.partitionKey().getKey(),
 +                        commit.update.metadata().cfId);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 34d7c40,03dd209..483da67
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -31,6 -30,6 +31,7 @@@ import javax.management.ObjectName
  import com.google.common.base.Predicate;
  import com.google.common.cache.CacheLoader;
  import com.google.common.collect.*;
++import com.google.common.primitives.Ints;
  import com.google.common.util.concurrent.Uninterruptibles;
  import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
@@@ -439,7 -425,7 +440,8 @@@ public class StorageProxy implements St
              // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
              // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
              // mean we lost messages), we pro-actively "repair" those nodes, and retry.
-             Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
 -            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, ballotMicros);
++            int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
++            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
              if (Iterables.size(missingMRC) > 0)
              {
                  Tracing.trace("Repairing replicas that missed the most recent commit");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0b3af8f,fde881b..e01f568
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@@ -65,7 -63,13 +65,13 @@@ public class PaxosStat
              lock.lock();
              try
              {
-                 PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata());
+                 // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+                 // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up
+                 // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no
+                 // amount of re-submit will fix this (because the node on which the commit has expired will have a
+                 // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
 -                long now = UUIDGen.unixTimestamp(toPrepare.ballot);
 -                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now);
++                int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot);
++                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec);
                  if (toPrepare.isAfter(state.promised))
                  {
                      Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@@ -100,7 -104,8 +106,8 @@@
              lock.lock();
              try
              {
-                 PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata());
 -                long now = UUIDGen.unixTimestamp(proposal.ballot);
 -                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now);
++                int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot);
++                PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec);
                  if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
                  {
                      Tracing.trace("Accepting proposal {}", proposal);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 9c54b01,081f457..ff81803
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@@ -87,8 -89,21 +90,21 @@@ public class PrepareCallback extends Ab
          latch.countDown();
      }
  
-     public Iterable<InetAddress> replicasMissingMostRecentCommit()
 -    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, long now)
++    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec)
      {
+         // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
+         // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
+         // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a
+         // quorum and further commit may proceed on incomplete information).
+         // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't
+         // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as
+         // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set
+         // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be
+         // propagated to all nodes.
 -        long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000;
 -        if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now)
++        long paxosTtlSec = SystemKeyspace.paxosTtlSec(metadata);
++        if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec)
+             return Collections.emptySet();
+ 
          return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
          {
              public boolean apply(InetAddress inetAddress)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70059726/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/UUIDGen.java
index a673f05,78b8b57..3efcb5e
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@@ -25,9 -25,9 +25,11 @@@ import java.security.SecureRandom
  import java.util.Collection;
  import java.util.Random;
  import java.util.UUID;
++import java.util.concurrent.TimeUnit;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Charsets;
++import com.google.common.primitives.Ints;
  
  
  /**
@@@ -203,6 -203,6 +205,15 @@@ public class UUIDGe
  
      /**
       * @param uuid
++     * @return seconds since Unix epoch
++     */
++    public static int unixTimestampInSec(UUID uuid)
++    {
++        return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(unixTimestamp(uuid)));
++    }
++
++    /**
++     * @param uuid
       * @return microseconds since Unix epoch
       */
      public static long microsTimestamp(UUID uuid)