You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/08/23 13:24:43 UTC

[GitHub] [accumulo] m-g-r opened a new issue, #2889: Thousands of ProxyServer updates get silently lost if BatchWriter is closed right after the last update has been sent

m-g-r opened a new issue, #2889:
URL: https://github.com/apache/accumulo/issues/2889

   **Describe the bug**
   
   Having optimized our insertion of data to Accumulo (see
   https://observablehq.com/@m-g-r/almost-600000-entries-per-second-from-lisp-to-accumulo)
   I noticed that the data written was often not complete when deleting
   entries with `deleteCell` mutations. At the same time there were not any
   errors to be seen on the client side nor in any log files.
   
   The problem seems to be caused by a combination of three things of the
   Accumulo Proxy, its Thrift interface but also in the client library of
   Accumulo that is used by the proxy:
   
   1. The methods `flush()`, `close()`, `addMutation()` etc. in the BatchWriter
   of the Accumulo Core client library are all marked "`synchronized`" but the
   shared internal resources itself, especially the `boolean closed`, the
   `MutationSet mutations`, and the `long` integer `totalMemUsed` are not
   protected from simultaneous use by different threads.
   
   "`Synchronized`" means that `close()` cannot be run at the same time by
   two threads but it still can run while `addMutation()` is runnig, for
   example.
   
   Here, `addMutation()` can be running and in a waiting state (for background
   jobs to write data to Accumulo) while `close()` is run by a new thread
   which then prevents `addMutation()` from finishing. (More on this
   further down.)
   
   
   3. The update call of the Accumulo Proxy is marked as "`oneway`".
   
   Thus errors cannot be sent back to the client immediately.  Instead
   if something gets wrong for an update call, the client can only be
   informed by a subsequent call.
   
   This seems to be the intention that the `flush` or `closeWriter` calls can
   throw an `MutationsRejectedException`. But this works only if those
   calls are not handled too early. That is, if I send a number of `update`
   calls the client continues without delay as these are `oneway` calls.
   The following `flush` or `closeWriter` will be send out immediately
   as well. If the threads handling the `update` calls are slower than the
   threads handling the `closeWriter()`, those slow `update` calls cannot
   be handled anymore.
   
   At the same time, as the `close` has happened already, the writer
   cannot be used anymore and the client will never be informed about
   those errors during the late updates.
   
   
   4. Errors during the `update` call are not properly handled and do not
   even lead to log messages.
   
   The reason seems to be that in 2013, when fixing
     ["ACCUMULO-1340 made proxy update call tolerate unknown session ids"](https://issues.apache.org/jira/browse/ACCUMULO-1340)
   the catch clause from `ProxyServer.update()` got changed like this:
   
   ```
   try {
          BatchWriterPlusException bwpe = getWriter(writer);
          addCellsToWriter(cells, bwpe);
   -    } catch (Exception e) {
   -      throw new TException(e);
   +    } catch (UnknownWriter e) {
   +      // just drop it, this is a oneway thrift call and throwing a TException seems to make all subsequent thrift calls fail
        }
      }
   ```
   
   with the side effect that also any other exceptions aside from
   `UnknownWriter` do not get thrown as `TExceptions` now. And Accumulo Proxy
   seems to ignore it aside from writing to stdout or stderr about it.
   
   
   I only saw the reason for our dropped mutations when running the
   Accumulo Proxy in the foreground:
   
   ```
   2022-08-08 13:55:05,897 [thrift.ProcessFunction] ERROR: Internal error processing update
   java.lang.IllegalStateException: Closed
           at org.apache.accumulo.core.clientImpl.TabletServerBatchWriter.addMutation(TabletServerBatchWriter.java:243)
           at org.apache.accumulo.core.clientImpl.BatchWriterImpl.addMutation(BatchWriterImpl.java:44)
           at org.apache.accumulo.proxy.ProxyServer.addCellsToWriter(ProxyServer.java:1389)
           at org.apache.accumulo.proxy.ProxyServer.update(ProxyServer.java:1453)
           at jdk.internal.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:567)
           at org.apache.accumulo.core.trace.TraceUtil.lambda$wrapService$8(TraceUtil.java:232)
           at com.sun.proxy.$Proxy9.update(Unknown Source)
           at org.apache.accumulo.proxy.thrift.AccumuloProxy$Processor$update.getResult(AccumuloProxy.java:9652)
           at org.apache.accumulo.proxy.thrift.AccumuloProxy$Processor$update.getResult(AccumuloProxy.java:9633)
           at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
           at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
           at org.apache.accumulo.server.rpc.TimedProcessor.process(TimedProcessor.java:61)
           at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
           at org.apache.accumulo.server.rpc.CustomNonBlockingServer$CustomFrameBuffer.invoke(CustomNonBlockingServer.java:112)
           at org.apache.thrift.server.Invocation.run(Invocation.java:18)
           at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at org.apache.accumulo.fate.util.LoggingRunnable.run(LoggingRunnable.java:35)
           at java.base/java.lang.Thread.run(Thread.java:830)
   ```
   Alas, the client code thinks all went well and continues to run as if no error has happened.
   
   **Versions (OS, Maven, Java, and others, as appropriate):**
    - Affected version(s) of this project: Apache Accumulo Version 2.0.1
    - Others: Accumulo Proxy from https://github.com/apache/accumulo-proxy as of 2022-04-01
   
   **To Reproduce**
   
   I have written a little test case to check the severity of the problem
   but as these are written in Common Lisp they will probably not be of
   help for you. I describe them instead.
   
   First, I add a number of simple entries to Accumulo (just numbers
   as key and value), then I count. Afterwards I try to delete all entries,
   and count again if the deletion was successful.
   
   I do this deletion with a batch scanner over all entries, creating
   simple update mutation with a `ColumnUpdate` with `deleteCell` `true` for
   each row found by the scanner. The updates I send to Accumulo with
   a writer. After the last update call I explicitly call flush and then
   close the writer. In Lisp this deletion function looks like this:
   ```
   (defun delete-all-values (table-name &key (k *scanner-next-k-entries*))
     ;; use a separate connection for the scanner, to make it as quick as doing the updates after the scanning
     (accumulo-client:with-connection (writer-connection)
       (accumulo-client:with-connection (accumulo-client:*connection*)
         (let ((writer (raccumulo.i::create-writer table-name writer-connection)))
           (unwind-protect
                (accumulo-client:with-scanner (scanner table-name)
                                              (:batch-scanner-p t :threads *scanner-threads*)
                  (loop for (entries more-p) = (multiple-value-list (accumulo-client:scanner-next-k-entries scanner :k k))
                        do (dolist (key-value entries)
                             (let* ((key (accumulo:keyvalue-key key-value))
                                    (row (accumulo:key-row key)))
                               (accumulo.accumulo-proxy:update (accumulo-client:connection-client writer-connection)
                                                               writer
                                                               (thrift:map row
                                                                           (thrift:list
                                                                            (accumulo:make-columnupdate :delete-cell t))))))
                        while more-p))
             (raccumulo.i::flush-writer writer)
             (raccumulo.i::close-writer writer))))))
   ```
   
   The test function is:
   
   ```
     (defun test (&optional (count 1000))
       (delete-entries :check-at-end t)
       (count-entries)
       (insert-entries count)
       (format t "~&inserted: ~d~%" (count-entries))
       (delete-entries)
       (let* ((num (count-entries))
              (succ (zerop num)))
         (format t "after deletion: ~d~%" num)
         (format t "~a~&" (if succ :successful :failed))
         (values succ num count)))
   ```
   
   And then a loop to do it a number of times is:
   
   ```
     (defun test-loop (&optional (times 3) (count 1000))
       (every #'identity
              (loop for i from 0 below times
                    do (format t "~&~%round: ~d" i)
                    collect (test count))))
   ```
   
   When I call it to make 10 rounds with 100.000 entries each, the outcome is:
   
   ```
     round: 0
     inserted: 100000
     after deletion: 15381
     FAILED
     
     round: 1
     inserted: 100000
     after deletion: 13338
     FAILED
     
     round: 2
     inserted: 100000
     after deletion: 18683
     FAILED
     
     round: 3
     inserted: 100000
     after deletion: 14296
     FAILED
     
     round: 4
     inserted: 100000
     after deletion: 9983
     FAILED
     
     round: 5
     inserted: 100000
     after deletion: 16286
     FAILED
     
     round: 6
     inserted: 100000
     after deletion: 12158
     FAILED
     
     round: 7
     inserted: 100000
     after deletion: 18712
     FAILED
     
     round: 8
     inserted: 100000
     after deletion: 10087
     FAILED
     
     round: 9
     inserted: 100000
     after deletion: 18290
     FAILED
   ```
     
   Each time a couple of thousand entries stay in the table.
   In the best case "only" 9.983 and in the worst case even 19.290.
   
   The Accumulo Proxy displays 37 times "`ERROR: Internal error processing
   update java.lang.IllegalStateException: Closed`" during that call.
   Full result attached: [20220808-tests-oneway_again-with-errors.txt](https://github.com/apache/accumulo/files/9402479/20220808-tests-oneway_again-with-errors.txt)
   
   I wrote another very simple test function to see how many updates I can
   send at a time without getting a fault:
   
   ```
     (defun meta-test-loop (&optional (times 3) (max 100) (step 1))
       (every #'identity
              (loop for i from 0 below max by step
                    do (format t "~&~%meta round: ~d" i)
                    collect (test-loop times i) into result
                    do (format t "~&~%meta round: ~d, result: ~a" i result)
                    finally (return result))))
   ```
   
   I called it as "`(meta-test-loop 10 10 1)`" that is start from 0 to 10 and
   write that number of entries 10 times. Already in round 5 it failed
   once. In round 6 it failed six times, in round 9 it failed 8 times out
   of 10.  Full result attached: [20220808-tests-oneway_again-with-errors2.txt](https://github.com/apache/accumulo/files/9402480/20220808-tests-oneway_again-with-errors2.txt)
   
   **Workarounds**
   
   When I add a delay of at least a couple of 100ms before `closeWriter`
   the problem starts to vanish. But as I do not receive any errors
   during an update because of problem 3 above, I can never be sure if it
   really succeeded. If the machine is under heavy load it might change.
   
   For a delete it is simple: I can count the entries at the end and if
   the number is not zero, I need to wait longer. That is what I have
   implemented in the function "`(delete-entries :check-at-end t)`".
   But for more complex mutation, this is not feasible. (As basically
   all mutation work needs to be retrieved from the server and checked
   explicitly.)
   
   The only easy workaround was to change the update call not to be `oneway`
   anymore and recompile the Java and Common Lisp Thrift interface of
   the Accumulo Proxy and then build a new Accumulo Proxy. With that change
   I do not see any errors anymore and all deletions are successful. The
   tests above as "`(test-loop 10 100000)`" run without any errors at all.
   
   But that comes with a severe drop in performance, instead of 600,000
   entries per second for my benchmark I get only 250,000. Other more
   complex import tasks take 19 hours instead of 3.
   
   
   **More on the flush operation of the `BatchWriter` and analysis**
   
   The `flush` operation as implemented in `BatchWriter` in `close()` just
   waits that all work as stored in the `MutationSet` is handled by the
   mutation writer background threads.
   
   This might be good enough for an inbetween flush but not if you want
   to `close()` and thus terminate or stutdown the writer. There might be
   threads just in the moment adding to the mutations.
   
   
   This code is in the core of Accumulo in the file:
     https://github.com/apache/accumulo/blob/rel/2.0.1/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
   There is a longer comment at the beginning on how it operates.
   
   It just looks at memory usage of the mutation, which is computed and
   updated. Each added mutation increases it by an estimation, each time
   a mutation is sent to the server it is reduced by the bytes sent.
   
   `flush()` or `close()` just waits while "`totalMemUsed > 0 && !somethingFailed`"
   holds true, and assumes afterwards that all work is done. This would
   usually be the case when `totalMemUsed` reaches zero.
   
   `addMutation()` increases `totalMemUsed` in the line:
   ```
         totalMemUsed += m.estimatedMemoryUsed();
   ```
   but that line is quite late in the function and the counter seems not
   be protected to be used from threads running in parallel. Only the
   functions `flush()`, `close()`, `addMutation()` etc. are all marked
   "`synchronized`" but that means `close()` can run while `addMutation()`
   is running.
   
   When I write 100.000 entries to Accumulo in one go, I expect there to
   be quite a number of threads running `addMutatation()` which would wait
   in the line
   ```
       waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed);
   ```
   
   But at the end when `close()` is called, `close` immediately sets
   ```
         closed = true;
   ```
   which then triggers the check in `addMutation()` just following the
   `WaitRTE()` above:
   
   ```
       // do checks again since things could have changed while waiting and not holding lock
       if (closed)
         throw new IllegalStateException("Closed");
   ```
   
   And that leads to the observed "`java.lang.IllegalStateException: Closed`"
   as reported by Accumulo Proxy.
   
   Hm, it is really just the flag "`closed`" that causes this problem.
   But the waiting by the line
   ```
      waitRTE(() -> totalMemUsed > 0 && !somethingFailed);
   ```
   in `close()` is also not enough to make sure that no other thread is not
   adding already more work in `addMutation()` as it got past the second
   "`if (closed)`" check and handled the mutation already before increasing
   the memory counter.
   
   
   This all seems rather thread unsafe. The precautions are not effective.
   In addition to this, it would be good if the client of the Accumulo Proxy
   had also a chance to test if all work was done. For example, by `flush`
   returning the number of mutation processed.
   
   I have no idea why this is not a problem for others. Is it not?
   The Common Lisp implementation code for Thrift compiles to native
   machine code, which runs efficiently, while having something delay the
   `close` just a little bit often alleviates the problem. But the problem should
   also exhibit itself when using the Java client library alone, that is, without
   the Accumulo Proxy (as long as one does not explicitly manage all threads
   oneself and makes sure that `close()` is never run as long as there are
   threads that might call `addMutation()`). Strange.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] dlmarion commented on issue #2889: Thousands of ProxyServer updates get silently lost if BatchWriter is closed right after the last update has been sent

Posted by GitBox <gi...@apache.org>.
dlmarion commented on issue #2889:
URL: https://github.com/apache/accumulo/issues/2889#issuecomment-1224186269

   I'm going to transfer this issue to the accumulo-proxy project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [accumulo] dlmarion commented on issue #2889: Thousands of ProxyServer updates get silently lost if BatchWriter is closed right after the last update has been sent

Posted by GitBox <gi...@apache.org>.
dlmarion commented on issue #2889:
URL: https://github.com/apache/accumulo/issues/2889#issuecomment-1224158989

   A lot of information here, thanks for the detailed explanation. I have not read through all of it yet, but I wanted to make an observation about this:
   
   > The methods flush(), close(), addMutation() etc. in the BatchWriter
   of the Accumulo Core client library are all marked "synchronized" but the
   shared internal resources itself, especially the boolean closed, the
   MutationSet mutations, and the long integer totalMemUsed are not
   protected from simultaneous use by different threads.
   "Synchronized" means that close() cannot be run at the same time by
   two threads but it still can run while addMutation() is runnig, for
   example.
   > Here, addMutation() can be running and in a waiting state (for background
   jobs to write data to Accumulo) while close() is run by a new thread
   which then prevents addMutation() from finishing. (More on this
   further down.)
   
   If I were to write a program in Java that had multiple threads writing to a batch writer, *only* addMutation would be called from the threads. The main thread of the program would wait for all of the other threads to finish, then the main thread would call close.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org