You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tephra.apache.org by gokulavasan <gi...@git.apache.org> on 2017/02/07 08:18:06 UTC

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

GitHub user gokulavasan opened a pull request:

    https://github.com/apache/incubator-tephra/pull/32

    (TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a r\u2026

    \u2026egion server to persist Prune Upper Bound info. Also don't refresh cache during startup of TransactionStateCache to avoid the possibility of Service stopping if tx.snapshot dir is not found during startup
    
    JIRA : 
    https://issues.apache.org/jira/browse/TEPHRA-215
    https://issues.apache.org/jira/browse/TEPHRA-218

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gokulavasan/incubator-tephra feature/tephra-215

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-tephra/pull/32.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #32
    
----
commit 542e0f3512c22387c3fe349baeb1744b64a4ca06
Author: Gokul Gunasekaran <go...@cask.co>
Date:   2017-02-07T08:17:14Z

    (TEPHRA-215) (TEPHRA-218) Use single thread across all regions in a region server to persist Prune Upper Bound info. Also don't refresh cache during startup of TransactionStateCache to avoid the possibility of Service stopping if tx.snapshot dir is not found during startup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202882
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
    + */
    +public class PruneUpperBoundWriterSupplierTest {
    +  private static final int NUM_OPS = 10;
    +
    +  @Test
    +  public void testSupplier() throws Exception {
    +    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
    +    final PruneUpperBoundWriter writer = supplier.get();
    +    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
    +    final Random random = new Random(System.currentTimeMillis());
    +
    +    // Start threads that will 'get' PruneUpperBoundWriters
    +    ExecutorService executor = Executors.newFixedThreadPool(3);
    +    List<Future> futureList = new ArrayList<>();
    +    for (int i = 0; i < 3; i++) {
    --- End diff --
    
    It would be good to have more threads too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100187618
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    --- End diff --
    
    Added log statements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100048456
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.AbstractHBaseTableTest;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
    + */
    +public class PruneUpperBoundWriterSupplierTest extends AbstractHBaseTableTest {
    --- End diff --
    
    Can you add comments on what this test is trying to do? I'm not sure I follow the test.
    
    Also, do we need to start an HBase instance for this test? We just need to make sure that prune writer gets stopped when reference count reaches zero, right? In that case can we pass in DataJanitorState as null to PruneUpperBoundWriterSupplier, and not call persistPruneEntry?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100172848
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    --- End diff --
    
    We need the key to be Comparable. Hence we need to use ByteBuffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #32: (TEPHRA-215) (TEPHRA-218) Use single thread acro...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on the issue:

    https://github.com/apache/incubator-tephra/pull/32
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100042435
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
    +
    +  private static volatile PruneUpperBoundWriter instance;
    +  private static volatile int refCount = 0;
    +  private static final Object lock = new Object();
    +
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, long pruneFlushInterval) {
    +    this.dataJanitorState = dataJanitorState;
    +    this.pruneFlushInterval = pruneFlushInterval;
    +  }
    +
    +  @Override
    +  public PruneUpperBoundWriter get() {
    +    synchronized (lock) {
    --- End diff --
    
    We can make an `instance == null` check outside the lock. In case instance is not null just increment refCount and return.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99881005
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -79,15 +72,17 @@ public void run() {
             while (!isInterrupted()) {
               long now = System.currentTimeMillis();
               if (now > (lastChecked + pruneFlushInterval)) {
    -            if (shouldFlush.compareAndSet(true, false)) {
    +            if (!pruneEntries.isEmpty()) {
                   // should flush data
                   try {
    -                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
    +                Iterator<PruneInfo> iterator = pruneEntries.iterator();
    --- End diff --
    
    Since we are doing concurrent operations here it would be good to use `peek()` and `poll()`, rather than a iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100173006
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    +      stopped = true;
           flushThread.interrupt();
         }
    --- End diff --
    
    What would be a good timeout?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202537
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---
    @@ -334,6 +337,46 @@ public Table get() throws IOException {
         }
       }
     
    +  @Test(timeout = 60000L)
    +  public void testClusterShutdown() throws Exception {
    --- End diff --
    
    Is this test required now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99879439
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  protected static volatile PruneUpperBoundWriter instance;
    +  protected static Object lock = new Object();
    --- End diff --
    
    Same here about protected. Also it is better to make the `lock` object final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100035027
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---
    @@ -167,7 +167,7 @@ protected Configuration getConfiguration(CoprocessorEnvironment env) {
       protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
         return new TransactionStateCacheSupplier(env.getConfiguration());
       }
    -
    +  
    --- End diff --
    
    If you reset this newline then this file will not have any changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100039885
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -76,18 +76,21 @@ private void startFlushThread() {
         flushThread = new Thread("tephra-prune-upper-bound-writer") {
           @Override
           public void run() {
    -        while (!isInterrupted()) {
    +        while (!isInterrupted() && !stopped) {
               long now = System.currentTimeMillis();
               if (now > (lastChecked + pruneFlushInterval)) {
    -            if (shouldFlush.compareAndSet(true, false)) {
    +            if (!pruneEntries.isEmpty()) {
    --- End diff --
    
    This check is no longer required since we check for `while (pruneEntries.firstEntry() != null)` later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #32: (TEPHRA-215) (TEPHRA-218) Use single thread acro...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on the issue:

    https://github.com/apache/incubator-tephra/pull/32
  
    @poornachandra Addressed comments. Please take a look when you get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202843
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
    + */
    +public class PruneUpperBoundWriterSupplierTest {
    +  private static final int NUM_OPS = 10;
    +
    +  @Test
    +  public void testSupplier() throws Exception {
    +    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
    +    final PruneUpperBoundWriter writer = supplier.get();
    +    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
    +    final Random random = new Random(System.currentTimeMillis());
    +
    +    // Start threads that will 'get' PruneUpperBoundWriters
    +    ExecutorService executor = Executors.newFixedThreadPool(3);
    +    List<Future> futureList = new ArrayList<>();
    +    for (int i = 0; i < 3; i++) {
    +      futureList.add(executor.submit(new Callable<Void>() {
    +
    +        @Override
    +        public Void call() throws Exception {
    +          // Since we already got one PruneUpperBoundWriter, we need to get NUM_OPS - 1
    +          while (numOps.decrementAndGet() > 0) {
    +            PruneUpperBoundWriter newWriter = supplier.get();
    +            Assert.assertTrue(newWriter == writer);
    +            int waitTime = random.nextInt(10);
    +            TimeUnit.MICROSECONDS.sleep(waitTime);
    +          }
    +          return null;
    +        }
    +      }));
    +    }
    +
    +    for (Future future : futureList) {
    +      future.get(5, TimeUnit.SECONDS);
    +    }
    +    executor.shutdown();
    +    executor.awaitTermination(2, TimeUnit.SECONDS);
    +
    +    futureList.clear();
    +    numOps.set(NUM_OPS);
    +    // Start thread that release PruneUpperBoundWriters
    +    executor = Executors.newFixedThreadPool(3);
    +    for (int i = 0; i < 3; i++) {
    +      futureList.add(executor.submit(new Callable<Void>() {
    --- End diff --
    
    Same here about using `Runnable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100201530
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,77 +18,85 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
     import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
    +  private final TableName tableName;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
    +    this.tableName = tableName;
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    LOG.info("Starting PruneUpperBoundWriter Thread.");
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    +      stopped = true;
    +      LOG.info("Stopping PruneUpperBoundWriter Thread.");
    --- End diff --
    
    It would be good to move this log message outside the if condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100042824
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---
    @@ -360,8 +404,18 @@ public TransactionStateCache get() {
         @Override
         public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile,
                                 CompactionRequest request) throws IOException {
    -      super.postCompact(e, store, resultFile, request);
    -      lastMajorCompactionTime.set(System.currentTimeMillis());
    +      synchronized (compactionLock) {
    +        super.postCompact(e, store, resultFile, request);
    +        lastMajorCompactionTime.set(System.currentTimeMillis());
    +      }
    +    }
    +
    +    @Override
    +    public void stop(CoprocessorEnvironment e) throws IOException {
    +      synchronized (stopLock) {
    +        System.out.println("*********** Stopping " + this.getClass().getName());
    --- End diff --
    
    Remove the debug println


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-tephra/pull/32


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100203117
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
    + */
    +public class PruneUpperBoundWriterSupplierTest {
    +  private static final int NUM_OPS = 10;
    +
    +  @Test
    +  public void testSupplier() throws Exception {
    +    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
    +    final PruneUpperBoundWriter writer = supplier.get();
    +    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
    +    final Random random = new Random(System.currentTimeMillis());
    +
    +    // Start threads that will 'get' PruneUpperBoundWriters
    +    ExecutorService executor = Executors.newFixedThreadPool(3);
    +    List<Future> futureList = new ArrayList<>();
    +    for (int i = 0; i < 3; i++) {
    +      futureList.add(executor.submit(new Callable<Void>() {
    +
    +        @Override
    +        public Void call() throws Exception {
    +          // Since we already got one PruneUpperBoundWriter, we need to get NUM_OPS - 1
    +          while (numOps.decrementAndGet() > 0) {
    +            PruneUpperBoundWriter newWriter = supplier.get();
    +            Assert.assertTrue(newWriter == writer);
    +            int waitTime = random.nextInt(10);
    +            TimeUnit.MICROSECONDS.sleep(waitTime);
    +          }
    +          return null;
    +        }
    +      }));
    +    }
    +
    +    for (Future future : futureList) {
    +      future.get(5, TimeUnit.SECONDS);
    +    }
    +    executor.shutdown();
    +    executor.awaitTermination(2, TimeUnit.SECONDS);
    +
    +    futureList.clear();
    +    numOps.set(NUM_OPS);
    +    // Start thread that release PruneUpperBoundWriters
    +    executor = Executors.newFixedThreadPool(3);
    +    for (int i = 0; i < 3; i++) {
    +      futureList.add(executor.submit(new Callable<Void>() {
    +
    +        @Override
    +        public Void call() throws Exception {
    +          // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
    +          // PruneUpperBoundWriter
    +          while (numOps.decrementAndGet() >= 0) {
    --- End diff --
    
    Can we change this to leave one instance out, so that we can assert that that instance is still running after the n-1 instances were released back. We can later release it separately and then assert that it is not running.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100201292
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,77 +18,85 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
     import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    --- End diff --
    
    It would be good to add a comment saying this should not be started/stopped by itself. Instead use `PruneUpperBoundWriterSupplier`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #32: (TEPHRA-215) (TEPHRA-218) Use single thread acro...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on the issue:

    https://github.com/apache/incubator-tephra/pull/32
  
    @poornachandra Ported changes to other compat modules. Please review when you get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99879068
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  protected static volatile PruneUpperBoundWriter instance;
    --- End diff --
    
    I don't think this needs to be protected


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100200476
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,77 +18,85 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
     import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
    +  private final TableName tableName;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) {
    +    this.tableName = tableName;
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    LOG.info("Starting PruneUpperBoundWriter Thread.");
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    +      stopped = true;
    +      LOG.info("Stopping PruneUpperBoundWriter Thread.");
           flushThread.interrupt();
    +      flushThread.join(TimeUnit.SECONDS.toMillis(1));
         }
       }
     
       private void startFlushThread() {
         flushThread = new Thread("tephra-prune-upper-bound-writer") {
           @Override
           public void run() {
    -        while (!isInterrupted()) {
    +        while (!isInterrupted() && !stopped) {
    --- End diff --
    
    We can replace the check for `stopped` with `isRunning()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202801
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
    + */
    +public class PruneUpperBoundWriterSupplierTest {
    +  private static final int NUM_OPS = 10;
    +
    +  @Test
    +  public void testSupplier() throws Exception {
    +    final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L);
    +    final PruneUpperBoundWriter writer = supplier.get();
    +    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
    +    final Random random = new Random(System.currentTimeMillis());
    +
    +    // Start threads that will 'get' PruneUpperBoundWriters
    +    ExecutorService executor = Executors.newFixedThreadPool(3);
    +    List<Future> futureList = new ArrayList<>();
    +    for (int i = 0; i < 3; i++) {
    +      futureList.add(executor.submit(new Callable<Void>() {
    --- End diff --
    
    `Callable` can be replaced with `Runnable` so that the return statement is not required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99905573
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,55 +18,48 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Queue;
    +import java.util.concurrent.ConcurrentLinkedQueue;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final Queue<PruneInfo> pruneEntries;
     
       private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    +    this.pruneEntries = new ConcurrentLinkedQueue<>();
         startFlushThread();
       }
     
    -  public boolean isAlive() {
    -    return flushThread.isAlive();
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    pruneEntries.add(new PruneInfo(regionName, pruneUpperBound));
    --- End diff --
    
    @poornachandra Should that max size be configurable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100043323
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---
    @@ -334,6 +337,45 @@ public Table get() throws IOException {
         }
       }
     
    +  @Test(timeout = 60000L)
    +  public void testClusterShutdown() throws Exception {
    +    java.util.concurrent.ExecutorService executorService = Executors.newSingleThreadExecutor();
    +    try {
    +      // Create a new transaction snapshot
    +      InMemoryTransactionStateCache.setTransactionSnapshot(
    +        new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
    +                                ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
    +      // Run major compaction
    --- End diff --
    
    The comment can be changed to - `Try to create a situation where HBase shuts down when the async prune writer thread is trying to write to the prune table on a compaction. This will surface any deadlocks during HBase shutdown.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100042655
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
    +
    +  private static volatile PruneUpperBoundWriter instance;
    +  private static volatile int refCount = 0;
    +  private static final Object lock = new Object();
    +
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, long pruneFlushInterval) {
    +    this.dataJanitorState = dataJanitorState;
    +    this.pruneFlushInterval = pruneFlushInterval;
    +  }
    +
    +  @Override
    +  public PruneUpperBoundWriter get() {
    +    synchronized (lock) {
    +      if (instance == null) {
    +        instance = new PruneUpperBoundWriter(dataJanitorState, pruneFlushInterval);
    +        instance.startAndWait();
    +      }
    +      refCount++;
    +      return instance;
    +    }
    +  }
    +
    +  public void release() {
    +    synchronized (lock) {
    +      refCount--;
    --- End diff --
    
    Same here, decrement refCount outside lock and return if refCount is greater than zero. Only lock when stopping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100028436
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java ---
    @@ -70,7 +71,11 @@ public void setConf(Configuration conf) {
     
       @Override
       protected void startUp() throws Exception {
    -    refreshState();
    +    try {
    +      refreshState();
    +    } catch (IOException ioe) {
    +      LOG.info("Error refreshing transaction state cache: " + ioe.getMessage());
    --- End diff --
    
    Would be a good idea to log the stack trace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100040267
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    --- End diff --
    
    It would be simpler to use `ConcurrentSkipListMap<byte[], Long>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100039577
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    +      stopped = true;
           flushThread.interrupt();
         }
    --- End diff --
    
    Also add `flushThread.join()` after the interrupt to wait for the flush thread to stop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99879750
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,55 +18,48 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Queue;
    +import java.util.concurrent.ConcurrentLinkedQueue;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final Queue<PruneInfo> pruneEntries;
     
       private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    +    this.pruneEntries = new ConcurrentLinkedQueue<>();
         startFlushThread();
    --- End diff --
    
    Since the flush thread is started in the `startUp` method, we don't need to start it in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99903840
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  protected static volatile PruneUpperBoundWriter instance;
    +  protected static Object lock = new Object();
    +
    +  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, long pruneFlushInterval) {
    +    this.dataJanitorState = dataJanitorState;
    +    this.pruneFlushInterval = pruneFlushInterval;
    +  }
    +
    +  @Override
    +  public PruneUpperBoundWriter get() {
    +    if (instance == null) {
    +      synchronized (lock) {
    +        if (instance == null) {
    +          instance = new PruneUpperBoundWriter(dataJanitorState, pruneFlushInterval);
    +          instance.start();
    --- End diff --
    
    @poornachandra Do you have any thoughts on how to address this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #32: (TEPHRA-215) (TEPHRA-218) Use single thread acro...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on the issue:

    https://github.com/apache/incubator-tephra/pull/32
  
    @poornachandra Addressed comments. Please take a look when you get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99882082
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  protected static volatile PruneUpperBoundWriter instance;
    +  protected static Object lock = new Object();
    +
    +  public PruneUpperBoundWriterSupplier(DataJanitorState dataJanitorState, long pruneFlushInterval) {
    +    this.dataJanitorState = dataJanitorState;
    +    this.pruneFlushInterval = pruneFlushInterval;
    +  }
    +
    +  @Override
    +  public PruneUpperBoundWriter get() {
    +    if (instance == null) {
    +      synchronized (lock) {
    +        if (instance == null) {
    +          instance = new PruneUpperBoundWriter(dataJanitorState, pruneFlushInterval);
    +          instance.start();
    --- End diff --
    
    We have the same problem as `TransactionStateCache` here too - we don't have a way to stop this thread after all instances of `TransactionProcessor` co-processors have been shutdown (https://issues.apache.org/jira/browse/TEPHRA-152)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202630
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
    + */
    +public class PruneUpperBoundWriterSupplierTest {
    +  private static final int NUM_OPS = 10;
    --- End diff --
    
    Since this is a fast operation, let's make this a bigger number. 1000 maybe?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100040861
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -76,18 +76,21 @@ private void startFlushThread() {
         flushThread = new Thread("tephra-prune-upper-bound-writer") {
           @Override
           public void run() {
    -        while (!isInterrupted()) {
    +        while (!isInterrupted() && !stopped) {
               long now = System.currentTimeMillis();
               if (now > (lastChecked + pruneFlushInterval)) {
    -            if (shouldFlush.compareAndSet(true, false)) {
    +            if (!pruneEntries.isEmpty()) {
                   // should flush data
                   try {
    -                dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get());
    +                while (pruneEntries.firstEntry() != null) {
    +                  Map.Entry<ByteBuffer, Long> firstEntry = pruneEntries.firstEntry();
    +                  dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey().array(), firstEntry.getValue());
    +                  // We can now remove the entry only if the key and value match with what we wrote since it is
    +                  // possible that a new pruneUpperBound for the same key has been added
    +                  pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
    +                }
                   } catch (IOException ex) {
    -                LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " +
    -                           pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex);
    -                // Retry again
    -                shouldFlush.set(true);
    +                LOG.warn("Cannot record prune upper bound for a region.", ex);
    --- End diff --
    
    It would be good to add prune state table name to the log message, so that it can help in figuring out mis-configurations if any


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #32: (TEPHRA-215) (TEPHRA-218) Use single thread acro...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on the issue:

    https://github.com/apache/incubator-tephra/pull/32
  
    @poornachandra Can you please take a look again. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra issue #32: (TEPHRA-215) (TEPHRA-218) Use single thread acro...

Posted by gokulavasan <gi...@git.apache.org>.
Github user gokulavasan commented on the issue:

    https://github.com/apache/incubator-tephra/pull/32
  
    @poornachandra Please review when you get a chance. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100193873
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    --- End diff --
    
    Use `new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202356
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hbase.TableName;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
    +
    +  private static volatile PruneUpperBoundWriter instance;
    +  private static volatile int refCount = 0;
    +  private static final Object lock = new Object();
    +
    +  private final TableName tableName;
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
    +                                       long pruneFlushInterval) {
    +    this.tableName = tableName;
    +    this.dataJanitorState = dataJanitorState;
    +    this.pruneFlushInterval = pruneFlushInterval;
    +  }
    +
    +  @Override
    +  public PruneUpperBoundWriter get() {
    +    synchronized (lock) {
    +      if (instance == null) {
    +        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
    +        instance.startAndWait();
    +      }
    +      refCount++;
    +      LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
    --- End diff --
    
    Would be good to enclose this log statement with `LOG.isDebugEnabled()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100054015
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,56 +18,56 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentSkipListMap;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final ConcurrentSkipListMap<ByteBuffer, Long> pruneEntries;
    +
    +  private volatile boolean stopped;
    +  private volatile Thread flushThread;
     
    -  private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    -    startFlushThread();
    +    this.pruneEntries = new ConcurrentSkipListMap<>();
    +  }
    +
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    // The number of entries in this map is bound by the number of regions in this region server and thus it will not
    +    // grow indefinitely
    +    pruneEntries.put(ByteBuffer.wrap(regionName), pruneUpperBound);
       }
     
       public boolean isAlive() {
    -    return flushThread.isAlive();
    +    return flushThread != null && flushThread.isAlive();
       }
     
    -  public void persistPruneEntry(long pruneUpperBound) {
    -    this.pruneUpperBound.set(pruneUpperBound);
    -    this.shouldFlush.set(true);
    +  @Override
    +  protected void startUp() throws Exception {
    +    startFlushThread();
       }
     
    -  public void stop() {
    +  @Override
    +  protected void shutDown() throws Exception {
         if (flushThread != null) {
    --- End diff --
    
    Also would be good to add a log statement saying "stopping PruneUpperBoundWriter" and "stopped PruneUpperBoundWriter"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100202379
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.txprune;
    +
    +
    +import com.google.common.base.Supplier;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.hbase.TableName;
    +
    +/**
    + * Supplies instances of {@link PruneUpperBoundWriter} implementations.
    + */
    +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
    +
    +  private static volatile PruneUpperBoundWriter instance;
    +  private static volatile int refCount = 0;
    +  private static final Object lock = new Object();
    +
    +  private final TableName tableName;
    +  private final DataJanitorState dataJanitorState;
    +  private final long pruneFlushInterval;
    +
    +  public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
    +                                       long pruneFlushInterval) {
    +    this.tableName = tableName;
    +    this.dataJanitorState = dataJanitorState;
    +    this.pruneFlushInterval = pruneFlushInterval;
    +  }
    +
    +  @Override
    +  public PruneUpperBoundWriter get() {
    +    synchronized (lock) {
    +      if (instance == null) {
    +        instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
    +        instance.startAndWait();
    +      }
    +      refCount++;
    +      LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
    +      return instance;
    +    }
    +  }
    +
    +  public void release() {
    +    synchronized (lock) {
    +      refCount--;
    +      LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
    --- End diff --
    
    Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r100208194
  
    --- Diff: tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---
    @@ -74,30 +81,35 @@ public Void call() throws Exception {
         futureList.clear();
         numOps.set(NUM_OPS);
         // Start thread that release PruneUpperBoundWriters
    -    executor = Executors.newFixedThreadPool(3);
    -    for (int i = 0; i < 3; i++) {
    -      futureList.add(executor.submit(new Callable<Void>() {
    +    executor = Executors.newFixedThreadPool(NUM_THREADS);
    +    for (int i = 0; i < NUM_THREADS; i++) {
    +      futureList.add(executor.submit(new Runnable() {
     
             @Override
    -        public Void call() throws Exception {
    +        public void run() {
               // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of
               // PruneUpperBoundWriter
    -          while (numOps.decrementAndGet() >= 0) {
    +          while (numOps.decrementAndGet() > 0) {
                 supplier.release();
    -            TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
    +            try {
    +              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
    +            } catch (InterruptedException e) {
    +              LOG.warn("Received an exception.", e);
    +            }
               }
    -          return null;
             }
           }));
         }
     
         for (Future future : futureList) {
           future.get(1, TimeUnit.SECONDS);
         }
    +    // Since we got one instance in the beginning, we need to release it
    +    supplier.release();
    --- End diff --
    
    It would be good to assert that the writer is running and alive before we do the final release


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-tephra pull request #32: (TEPHRA-215) (TEPHRA-218) Use single thre...

Posted by poornachandra <gi...@git.apache.org>.
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/32#discussion_r99880416
  
    --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---
    @@ -18,55 +18,48 @@
     
     package org.apache.tephra.hbase.txprune;
     
    +import com.google.common.util.concurrent.AbstractIdleService;
     import org.apache.commons.logging.Log;
     import org.apache.commons.logging.LogFactory;
    -import org.apache.hadoop.hbase.TableName;
    -import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
     
     import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Queue;
    +import java.util.concurrent.ConcurrentLinkedQueue;
     import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.concurrent.atomic.AtomicLong;
     
     /**
      * Thread that will write the the prune upper bound
      */
    -public class PruneUpperBoundWriter {
    -  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
    +public class PruneUpperBoundWriter extends AbstractIdleService {
    +  private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class);
     
    -  private final TableName pruneStateTable;
       private final DataJanitorState dataJanitorState;
    -  private final byte[] regionName;
    -  private final String regionNameAsString;
       private final long pruneFlushInterval;
    -  private final AtomicLong pruneUpperBound;
    -  private final AtomicBoolean shouldFlush;
    +  private final Queue<PruneInfo> pruneEntries;
     
       private Thread flushThread;
       private long lastChecked;
     
    -  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString,
    -                               byte[] regionName, long pruneFlushInterval) {
    -    this.pruneStateTable = pruneStateTable;
    +  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, long pruneFlushInterval) {
         this.dataJanitorState = dataJanitorState;
    -    this.regionName = regionName;
    -    this.regionNameAsString = regionNameAsString;
         this.pruneFlushInterval = pruneFlushInterval;
    -    this.pruneUpperBound = new AtomicLong();
    -    this.shouldFlush = new AtomicBoolean(false);
    +    this.pruneEntries = new ConcurrentLinkedQueue<>();
         startFlushThread();
       }
     
    -  public boolean isAlive() {
    -    return flushThread.isAlive();
    +  public void persistPruneEntry(byte[] regionName, long pruneUpperBound) {
    +    pruneEntries.add(new PruneInfo(regionName, pruneUpperBound));
    --- End diff --
    
    I think it would be good to limit the number of entries in the `pruneEntries` queue. If we are over the limit then we can drop earlier elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---