You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@accumulo.apache.org by adamjshook <gi...@git.apache.org> on 2017/04/27 19:35:52 UTC

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

GitHub user adamjshook opened a pull request:

    https://github.com/apache/accumulo/pull/254

    [ACCUMULO-4506] Add a timeout to the replication task

    This addresses an issue where a replication task will get stuck for a
    substantial amount of time.  Adding a timeout will abandon the task
    allowing another worker (or the same worker) to pick it up.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/adamjshook/accumulo 1.7

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/accumulo/pull/254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #254
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114575478
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    Not sure how to go about setting the client's timeout -- not seeing anything in the API directly.  Digging in a bit I see that `ReplicationClient#getServicerConnection` is using `ThriftUtil#getClientNoTimeout`?  Maybe that is all that needs to change and this entire thing is moot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114381575
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -416,6 +444,12 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
         Status lastStatus = status, currentStatus = status;
         final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
         while (true) {
    +      // Check if the thread has been interrupted prior to replicating data
    +      if (Thread.interrupted()) {
    --- End diff --
    
    I am seeing the interrupt being caught by this thread and properly handling the timeout.  I can add a boolean regardless, though, if that is the consensus.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    Thanks, @adamjshook.
    
    > Not too sure how to go about adding a unit or integration test for this timeout
    
    Would it help to re-factor this code to make it more unit-testable? I think creating a "good" integration test for this change would be hard -- I'd be happy with a unit test, likely supported by a mocking framework.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114387450
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -416,6 +444,12 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
         Status lastStatus = status, currentStatus = status;
         final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
         while (true) {
    +      // Check if the thread has been interrupted prior to replicating data
    +      if (Thread.interrupted()) {
    --- End diff --
    
    I can't be sure if its needed.  Its a question of do all of the functions called by this code handle interrupts properly? I have no idea what the answer to that question is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838618
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    --- End diff --
    
    This will be implicitly retried by the framework, right? Would should give some indication of that in the message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    > the replicate call takes in the ReplicaSystemHelper as an argument and I wasn't too sure how to go about creating an instance of that
    
    Nothing special, just instantiate it. It requires a `ClientContext` which you can also mock (the implementation just uses that to get at a `Connector`).
    
    It may make your life easier to break up `_replicate(..)` further - I think that would be good if it makes your testing even easier.
    
    ```java
    } else {
      span = Trace.start("WAL replication");
      try {
        finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
      } finally {
        span.stop();
      }
    }
    ```
    
    This section is most of what you changed. What if you lift this into its own method, and then use that as the entry-point to test? Something like `replicateLogsWithTimeout()`..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113839731
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    +              finalStatus = status;
                 } finally {
                   span.stop();
    +              executor.shutdownNow();
                 }
               }
     
    -          log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    --- End diff --
    
    It's moved up to the new line 311 inside the try/catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to a replication RPC call

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    @joshelser @keith-turner Updated using the new method.  Let me know if setting that config property isn't the best way to do it.  Tested it on my pair of VMs and the client timeout is throwing an exception which is caught by the `AccumuloReplicaSystem`.  The replication is then tried again up to `replication.work.attempts` before being abandoned entirely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    > I've been reading up on Executors#sameThreadExecutor, and I don't think we can use this here since it runs the task prior to getting any Future. The timeout logic would not kick in until the task is finished.
    The task has to be run in a separate thread while the main thread idles to trigger the timeout.
    
    Ahh, bummer. Thought I could be tricky and help save us yet another thread ;)
    
    > Either way the test passes since the Status returned is the same, but it isn't a very good test since the behavior isn't really defined. I think we are a bit too high-level here for a unit test.
    
    I would be happy with a test that is not "human-level". That is, if you can write a test showing that your interrupt/retry logic works, I would +1 that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    @joshelser Applied your feedback to the commit.  I'll have to run it through my test later, though -- away from my PC at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to a replication RPC call

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    Alright, updated again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    I've been reading up on `Executors#sameThreadExecutor`, and I don't think we can use this here since it runs the task prior to getting any `Future`.  The timeout logic would not kick in until the task is finished. 
     The task has to be run in a separate thread while the main thread idles to trigger the timeout.
    
    Additionally, I'm having a bit of trouble getting a unit test I am pleased with.  I've made one that runs, but there is a race condition there where it'll either not run the task at all or the task runs but it throws and logs a `Connection refused` error because there is no server running to receive any work. You can take a look at the commit [here](https://github.com/adamjshook/accumulo/commit/e0c9ca76982aaa327dba0084e82b339383dfd09c).
    
    Either way the test passes since the `Status` returned is the same, but it isn't a very good test since the behavior isn't really defined.  I think we are a bit too high-level here for a unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114616296
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/conf/Property.java ---
    @@ -584,6 +584,8 @@
       REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION,
           "Amount of time to wait before re-checking for replication work, not useful outside of tests"),
       REPLICATION_TRACE_PERCENT("replication.trace.percent", "0.1", PropertyType.FRACTION, "The sampling percentage to use for replication traces"),
    +  REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION,
    --- End diff --
    
    If this is done for 1.7, then it's a behavior change in replication for a bug fix release.   I think its ok and good, we somehow need to remember to mention this in the 1.7 release notes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114575274
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -362,6 +400,7 @@ protected Status replicateRFiles(ClientContext peerContext, final HostAndPort pe
       protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status,
           final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi)
           throws TTransportException, AccumuloException, AccumuloSecurityException {
    +    stopReplication.set(false);
    --- End diff --
    
    Is it possible for multiple threads to call this method concurrently?  If so, may want to make stopReplication a local var so that each thread gets its own atomic boolean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114581580
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    May still need some overall logic.  If there are multiple client/RPC calls, the sum of them could exceed the replication timeout.  For example if there are three RPC calls and each takes 9 mins and the replication timeout is 10 min, then it could run for 27 min without timing out.
    
    Could possibly set the RPC timeout for each call to replication_timeout - time_consumer_so_far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114604524
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -161,7 +164,16 @@ public void configure(String configuration) {
       @Override
       public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
         final Instance localInstance = HdfsZooInstance.getInstance();
    -    final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +    AccumuloConfiguration instanceConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +
    +    // Overwrite RPC timeout value used by ClientContext with the replication timeout value
    +    Map<String,String> confMap = new HashMap<>();
    +    instanceConf.getProperties(confMap, Predicates.<String> alwaysTrue());
    +    confMap.put(Property.GENERAL_RPC_TIMEOUT.getKey(), instanceConf.get(Property.REPLICATION_RPC_TIMEOUT));
    --- End diff --
    
    Hrm, that's annoying. I was just expecting the timeout value to be passed in as an argument to `ThriftUtil`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114597621
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    @adamjshook I see you POV now.  Would help to accurately document what the replication timeout means .  Document that its the maximum time any individual RPC related to replication is allowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114585046
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    > May still need some overall logic. If there are multiple client/RPC calls, the sum of them could exceed the replication timeout. For example if there are three RPC calls and each takes 9 mins and the replication timeout is 10 min, then it could run for 27 min without timing out.
    
    I don't think that's the scope of what Adam is trying to fix. He's trying to set, for a single WAL being replicated, is the timeout for that replication task. So, for the number of threads that a TabletServer will devote to replicating, each of them would have the same amount of time to try to execute (10mins).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114576210
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    > Digging in a bit I see that ReplicationClient#getServicerConnection is using ThriftUtil#getClientNoTimeout? Maybe that is all that needs to change and this entire thing is moot.
    
    That might actually be a much more simple solution than what you've put together. Sorry I didn't think to suggest that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/accumulo/pull/254


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114592878
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    I don't see a difference either way.  What I am trying to address is a rare block in the replication pipeline, not necessarily that it takes long to do the work.  I'd just need the RPC call to timeout and bubble up so the tserver performing the replication work can release the lock so it is re-tried.
    
    This does, however, complicate my testing since I am now dealing with Thrift timeouts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114575625
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -362,6 +400,7 @@ protected Status replicateRFiles(ClientContext peerContext, final HostAndPort pe
       protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status,
           final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi)
           throws TTransportException, AccumuloException, AccumuloSecurityException {
    +    stopReplication.set(false);
    --- End diff --
    
    I checked on that and a new instance of `AccumuloReplicaSystem` is created for every task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114608208
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -161,7 +164,16 @@ public void configure(String configuration) {
       @Override
       public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
         final Instance localInstance = HdfsZooInstance.getInstance();
    -    final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +    AccumuloConfiguration instanceConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +
    +    // Overwrite RPC timeout value used by ClientContext with the replication timeout value
    +    Map<String,String> confMap = new HashMap<>();
    +    instanceConf.getProperties(confMap, Predicates.<String> alwaysTrue());
    +    confMap.put(Property.GENERAL_RPC_TIMEOUT.getKey(), instanceConf.get(Property.REPLICATION_RPC_TIMEOUT));
    --- End diff --
    
    I'd be in favor of that if you'd like to roll that change in too. Looks like there's precedence in that class already https://github.com/apache/accumulo/blob/master/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java#L177


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114607540
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -161,7 +164,16 @@ public void configure(String configuration) {
       @Override
       public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
         final Instance localInstance = HdfsZooInstance.getInstance();
    -    final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +    AccumuloConfiguration instanceConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +
    +    // Overwrite RPC timeout value used by ClientContext with the replication timeout value
    +    Map<String,String> confMap = new HashMap<>();
    +    instanceConf.getProperties(confMap, Predicates.<String> alwaysTrue());
    +    confMap.put(Property.GENERAL_RPC_TIMEOUT.getKey(), instanceConf.get(Property.REPLICATION_RPC_TIMEOUT));
    --- End diff --
    
    Actually, I'd rather just do that.  Seems like a reasonable API to me and the only function using that private method is `getClientNoTimeout`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114136601
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, task will be retried by the framework");
    +              finalStatus = status;
                 } finally {
                   span.stop();
    +              executor.shutdownNow();
    --- End diff --
    
    Could add comment here that shutdownNow was called instead of shutdown because it will interrupt threads.
    
    If you had an atomic boolean for interrupting, could set it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114594466
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    I believe so.  It is using `ThriftUtil#getClientNoTimeout` to get a client for sending the mutations to the peer server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114605056
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -161,7 +164,16 @@ public void configure(String configuration) {
       @Override
       public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
         final Instance localInstance = HdfsZooInstance.getInstance();
    -    final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +    AccumuloConfiguration instanceConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +
    +    // Overwrite RPC timeout value used by ClientContext with the replication timeout value
    +    Map<String,String> confMap = new HashMap<>();
    +    instanceConf.getProperties(confMap, Predicates.<String> alwaysTrue());
    +    confMap.put(Property.GENERAL_RPC_TIMEOUT.getKey(), instanceConf.get(Property.REPLICATION_RPC_TIMEOUT));
    --- End diff --
    
    Yeah, I think you did the best with what you were given.
    
    That sounds like something we can improve later, circling back around to this change and eliminating this configuration copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114136260
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -416,6 +444,12 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
         Status lastStatus = status, currentStatus = status;
         final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
         while (true) {
    +      // Check if the thread has been interrupted prior to replicating data
    +      if (Thread.interrupted()) {
    --- End diff --
    
    In the past I have found that other libraries (like Hadoop) eat interrupts, so it may never be seen.  I am not sure, but I was wondering if it would it make sense to have an additional atomic boolean that you set and and check?  I had to do this to reliably interrupt scans in Accumulo. If there were an AtomicBoolean, could do something like `if(Thread.interrupted() || stopped.get())`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to a replication RPC call

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    This LGTM. WDYT, @keith-turner ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    Any chance this could be reviewed today?  I'd like to get it blessed before applying it on my end to address our production issue.  Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    @joshelser Back at you.  I think I managed to scrape together a test that I am happy with.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114594781
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    > This does, however, complicate my testing since I am now dealing with Thrift timeouts.
    
    I'm actually OK dropping your test completely. That's out of scope -- you should just be able to use that call "as API". Not your fault if it doesn't work correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114387702
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -416,6 +444,12 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
         Status lastStatus = status, currentStatus = status;
         final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
         while (true) {
    +      // Check if the thread has been interrupted prior to replicating data
    +      if (Thread.interrupted()) {
    --- End diff --
    
    Fair point; that is hard to say.  I'll add it regardless to make sure the thread will stop appropriately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    Not too sure how to go about adding a unit or integration test for this timeout.  I tested the timeout by launching two clusters and replicating data with a short timeout.  Replication was halted on the tablet server and re-scheduled at a later time.  Increasing the timeout allowed replication to finish as expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    Let me bring this out of the thread, given the good find by @adamjshook that the code calls the thrift RPC method without timeout, I think this change could be greatly simplified: add the configuration property for timeout on WAL replication RPCs (from source to peer cluster) and switch that RPC to use the RPC with timeout method on ThriftUtil. Literally, the 3 or 4 lines of code that would take.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to a replication RPC call

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    I think it looks good.  Thanks @adamjshook 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114137768
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    Should this clients timeout be set to some function of the replication timeout?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114577226
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    I also just gave master a quick look, AccumuloReplicaSystem is the only caller of that method, so you don't have to worry about other callers having the semantics changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114576956
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    I'll give it a shot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114605401
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -161,7 +164,16 @@ public void configure(String configuration) {
       @Override
       public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
         final Instance localInstance = HdfsZooInstance.getInstance();
    -    final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +    AccumuloConfiguration instanceConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +
    +    // Overwrite RPC timeout value used by ClientContext with the replication timeout value
    +    Map<String,String> confMap = new HashMap<>();
    +    instanceConf.getProperties(confMap, Predicates.<String> alwaysTrue());
    +    confMap.put(Property.GENERAL_RPC_TIMEOUT.getKey(), instanceConf.get(Property.REPLICATION_RPC_TIMEOUT));
    --- End diff --
    
    That, or make https://github.com/apache/accumulo/blob/master/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java#L137 `public`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114594106
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    >  I'd just need the RPC call to timeout and bubble up
    
    Can the thrift call just hang forever now?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113839682
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    --- End diff --
    
    Wasn't aware of that special Executor -- that'll work here, no need for another thread.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114392548
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -416,6 +444,12 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer
         Status lastStatus = status, currentStatus = status;
         final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
         while (true) {
    +      // Check if the thread has been interrupted prior to replicating data
    +      if (Thread.interrupted()) {
    --- End diff --
    
    All set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    Thanks for the pointers.  I'll head down this route and get back to you soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838502
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    --- End diff --
    
    Any reason that we need to start an extra thread here?
    
    Guava has a `MoreExecutors` class with a special Executor that uses the same thread. Would that work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838774
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replstats");
    --- End diff --
    
    nit: whole words instead of `replstats` here, please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114593667
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -536,8 +570,21 @@ public ReplicationStats execute(Client client) throws Exception {
     
           // If we have some edits to send
           if (0 < edits.walEdits.getEditsSize()) {
    +        // Check if we are interrupted before to writing the edits
    +        if (Thread.interrupted()) {
    +          log.debug("Replication work interrupted before writing edits, returning empty replication stats");
    +          return new ReplicationStats(0L, 0L, 0L);
    +        }
    +
             log.debug("Sending {} edits", edits.walEdits.getEditsSize());
             long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
    --- End diff --
    
    > I don't see a difference either way.
    
    If the thread interruption properly percolates up from the thrift call, then it does seem functionally equivalent.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to a replication R...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r114604832
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -161,7 +164,16 @@ public void configure(String configuration) {
       @Override
       public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
         final Instance localInstance = HdfsZooInstance.getInstance();
    -    final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +    AccumuloConfiguration instanceConf = new ServerConfigurationFactory(localInstance).getConfiguration();
    +
    +    // Overwrite RPC timeout value used by ClientContext with the replication timeout value
    +    Map<String,String> confMap = new HashMap<>();
    +    instanceConf.getProperties(confMap, Predicates.<String> alwaysTrue());
    +    confMap.put(Property.GENERAL_RPC_TIMEOUT.getKey(), instanceConf.get(Property.REPLICATION_RPC_TIMEOUT));
    --- End diff --
    
    Yeah, that'd be nice :)  It gets it from the `ClientContext` and the context returns the value of `GENERAL_RPC_TIMEOUT`.  No way to change it except through copying the configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113839773
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    --- End diff --
    
    Correct, I'll add that to th emessage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request #254: [ACCUMULO-4506] Add a timeout to the replication...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/254#discussion_r113838793
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---
    @@ -288,15 +295,36 @@ public String execute(ReplicationCoordinator.Client client) throws Exception {
                 }
               } else {
                 span = Trace.start("WAL replication");
    +
    +            ExecutorService executor = Executors.newFixedThreadPool(1);
    +
                 try {
    -              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +              Future<Status> replStatus = executor.submit(new Callable<Status>() {
    +                @Override
    +                public Status call() throws Exception {
    +                  return replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
    +                }
    +              });
    +
    +              log.debug("Getting replication status with timeout {}", conf.get(Property.REPLICATION_TIMEOUT));
    +              finalStatus = replStatus.get(conf.getTimeInMillis(Property.REPLICATION_TIMEOUT), MILLISECONDS);
    +              log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    +            } catch (InterruptedException e) {
    +              log.debug("Interrupted exception during replication", e);
    +              Thread.currentThread().interrupt();
    +              finalStatus = status;
    +            } catch (ExecutionException e) {
    +              log.warn("Caught execution exception", e);
    +              finalStatus = status;
    +            } catch (TimeoutException e) {
    +              log.debug("Replication timeout triggered, shutting down");
    +              finalStatus = status;
                 } finally {
                   span.stop();
    +              executor.shutdownNow();
                 }
               }
     
    -          log.debug("New status for {} after replicating to {} is {}", p, peerContext.getInstance(), ProtobufUtil.toString(finalStatus));
    --- End diff --
    
    What's the reasoning behind dropping this debug msg?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo issue #254: [ACCUMULO-4506] Add a timeout to the replication task

Posted by adamjshook <gi...@git.apache.org>.
Github user adamjshook commented on the issue:

    https://github.com/apache/accumulo/pull/254
  
    @joshelser I think so.  I was taking a look at `AccumuloReplicaSystemTest` and seeing how I could fit something in to try and trigger the timeout, but the `replicate` call takes in the `ReplicaSystemHelper` as an argument and I wasn't too sure how to go about creating an instance of that.  And even then we could only assert the `Status` didn't change, but it couldn't change for a number of reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---