You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by "Kelvin Kakugawa (JIRA)" <ji...@apache.org> on 2010/06/18 20:48:24 UTC

[jira] Commented: (CASSANDRA-1072) Increment counters

    [ https://issues.apache.org/jira/browse/CASSANDRA-1072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12880280#action_12880280 ] 

Kelvin Kakugawa commented on CASSANDRA-1072:
--------------------------------------------

{noformat}
context-based clocks

interface extensions to cassandra.thrift:
  replace timestamp w/ Clock()
    Clock:
      optional long timestamp
      optional byte[] context

data structure code changes:
  db.ColumnFamilyType + db.ClockType
    enums
    db.ColumnFamilyType:
      Super / Standard
    db.ClockType:
      Timestamp / IncrementCounter

    applied to all IColumnContainer sub-classes (CF / SC)
      checked to determine switches in code

  db.context package
    IContext:
      context creation + manipulation
    AbstractReconciler
      context-based clock reconciliation

  IncrementCounterContext
    context structure (current):
      {timestamp of last update + [(node id, count), ...]

    compare():
      timestamp-based compare (of last update) -- highest

    diff():
      tuple-based comparison
        greater than:
          has at least every node and each count is larger (than comparison context)

  db.IClock
    concrete *Clock representations
    encapsulates db.context.IContext functionality
    current sub-classes:
      TimestampClock
      IncrementCounterClock
    where the ClockType knows which contextManager (db.context.IContext) to use

  db.IColumn
    timestamp replaced w/ IClock
    markedForDeleteAt replaced w/ IClock

algorithm code changes:
1) on insert
  a) thrift.CassandraServer : doInsert(...)
    thrift.ThriftValidation : validateClock(Clock)
      takes a thrift Clock and creates the appropriate IClock impl

  b) service.StorageProxy : mutateBlocking(...)
    db.RowMutation : updateClocks()
      iterates through all CFs w/in RM
        for any context-based CF type
          creates appropriate context structure
            i) counter
              looks at value being inserted, then creates appropriate context
                e.g. {timestamp + [(replica node id, value as long in bytes)]}

  c) local / remote insert
    db.Table : apply()
      CF.addColumn()
        inserts into CSLM (ConcurrentSkipListMap) of columns_
        if null returned,
          then success and exit

        else:
          save delta (the associated count for the XClock being inserted)
          pull old Column
          use Reconciler to collapse saved delta Column w/ old Column counter clocks:
            e.g. for incremental counters
              i) aggregate this replica's counts
              ii) take max of every other replica's counts

2) read
  CL.ONE read:
    just pull from the first replica that answers

  read repair (used by QUORUM and, in the background, ONE):
    check step:
      read from each replica
      blockFor QUORUM # of replicas
        where one replica is randomly chosen to be non-digest
      check results in service.ReadResponseResolver : resolve()
        calculate digest for non-digest CF against all digests received
        if they don't match:
          then kick off repair step

    repair step:
      read non-digest from every replica
      blockFor QUORUM # of replicas
      fix results in service.RRR : resolve() + two other methods
        i) assemble all versions of the CF from replicas received
        ii) create a "resolved" CF via CF.resolve()
          CF.resolve(other CF)
            CF.addAll(other CF)
              calls CF.addColumn() for each IColumn in the other CF
        iii) for each version received, create a repair version to be sent to that replica
          repairCF = reconciledCF.diff(versionCF)
          if null,
            skip
          call: repairCF.cleanNodeCounts(replica to repair)
            wipes out all the counts for the given replica in every *CounterClock in the CF 
          otherwise, send RM w/ repairCF under read-repair verb 

3) compaction
  uses same CF.addColumn() code path to aggregate Columns across SSTs
    nothing special

4) AES
  uses a modified compaction iterator
    service.AntiEntropyService : doAESCompaction()
      that applies the same code path from read-repair:
        XCounterClock : cleanNodeCounts(InetAddress replica)

      so, that the IClock contexts being created to repair the remote replicas
        do not send over the counts for that given replica
{noformat}


> Increment counters
> ------------------
>
>                 Key: CASSANDRA-1072
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-1072
>             Project: Cassandra
>          Issue Type: Sub-task
>          Components: Core
>            Reporter: Johan Oskarsson
>            Assignee: Kelvin Kakugawa
>         Attachments: CASSANDRA-1072.patch, CASSANDRA-1072.patch
>
>
> Break out the increment counters out of CASSANDRA-580. Classes are shared between the two features but without the plain version vector code the changeset becomes smaller and more manageable.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.