You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@accumulo.apache.org by ShawnWalker <gi...@git.apache.org> on 2016/04/12 22:02:04 UTC

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

GitHub user ShawnWalker opened a pull request:

    https://github.com/apache/accumulo/pull/90

    ACCUMULO-4187: Added rate limiting for major compactions.

    Added configuration property `tserver.compaction.major.throughput` of type `PropertyType.MEMORY` to control rate limiting of major compactions on each tserver.
      
    Specifying a value of `0B` (the default) disables rate limiting.
    
    If a positive value is specified, then all tablet servers will limit the I/O performed during major compaction accordingly.  For example, with `tserver.compaction.major.throughput=30M`, then each tserver will read no more than 30MiB per second and write no more than 30MiB combined over all major compaction threads.
    
    This change involved adding an optional `RateLimiter` parameter to `FileOperations.openReader(...)` and `FileOperations.openWriter(...)`.  Most of the file changes involve adding an appropriate `null` to invocations of these methods.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ShawnWalker/accumulo ACCUMULO-4187

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/accumulo/pull/90.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #90
    
----
commit 51396335b275b980c09964ad3ca95bf2a6fe5842
Author: Shawn Walker <ac...@shawn-walker.net>
Date:   2016-04-06T17:18:09Z

    ACCUMULO-4187: Added rate limiting for major compactions.
    
    Added configuration property tserver.compaction.major.throughput of type PropertyType.MEMORY with a default of 0B (unlimited).  If another value is specified (e.g. 30M), then all tablet servers will limit the I/O performed during major compaction accordingly (e.g. neither reading nor writing more than 30MiB per second combined over all major compaction threads).

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ctubbsii <gi...@git.apache.org>.
Github user ctubbsii commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59459590
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    --- End diff --
    
    Unfortunately, LongAdder is only available in JDK8, and we're still on JDK7.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489500
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java ---
    @@ -132,7 +136,8 @@ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, Accumul
           sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
         }
     
    -    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
    +    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
    --- End diff --
    
    nit: I would be ok with splitting up this line. It's getting a little busy :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by benjumanji <gi...@git.apache.org>.
Github user benjumanji commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59581047
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          throw new UnsupportedOperationException("Underlying stream does not support position()");
    +        }
    +      };
    +    }
    +  }
    +
    +  public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
    --- End diff --
    
    `<T extends Outputstream & PositionedOutput> T foo()` would work? Bit gross though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-209647745
  
    I played around with this branch locally.  I created a table with 10,000,000 entries using test_ingest using the following commands. 
    
    ```
    ./bin/accumulo shell -u root -p secret -e "createtable test_ingest"
    ./bin/accumulo org.apache.accumulo.test.TestIngest -u root -p secret --timestamp 1 --size 50 --random 56 --rows 10000000 --start 0 --cols 1 --instance instance16
    ```
    I set the rate limit to 5M and forced a compaction.  I saw the following in the tserver logs.
    
    ```
    Compaction 2<< 10,000,000 read | 10,000,000 written | 122,925 entries/sec | 81.350 secs |  431,758,096 bytes | 5307413.596 byte/sec
    ```
    
    Then I split the table into 8 tablets and forced a compaction to test the rate limit for multiple threads.  I had the default of 3 compaction threads.  I saw the following in the logs for this test.
    
    ```
    Compaction 2;row_0003749;row_00025 1,249,000 read | 1,249,000 written | 41,866 entries/sec | 29.833 secs |   53,926,291 bytes | 1807605.370 byte/sec
    Compaction 2;row_00025;row_000125 1,250,000 read | 1,250,000 written | 41,899 entries/sec | 29.833 secs |   53,970,229 bytes | 1809078.168 byte/sec
    Compaction 2;row_000125< 1,250,000 read | 1,250,000 written | 41,783 entries/sec | 29.916 secs |   53,969,343 bytes | 1804029.382 byte/sec
    Compaction 2;row_000625;row_0005 1,250,000 read | 1,250,000 written | 42,134 entries/sec | 29.667 secs |   53,970,847 bytes | 1819221.593 byte/sec
    Compaction 2;row_0005;row_0003749 1,251,000 read | 1,251,000 written | 42,109 entries/sec | 29.708 secs |   54,012,874 bytes | 1818125.555 byte/sec
    Compaction 2;row_00075;row_000625 1,250,000 read | 1,250,000 written | 41,881 entries/sec | 29.846 secs |   53,969,549 bytes | 1808267.406 byte/sec
    Compaction 2;row_000875;row_00075 1,250,000 read | 1,250,000 written | 63,909 entries/sec | 19.559 secs |   53,969,511 bytes | 2759318.523 byte/sec
    Compaction 2<;row_000875 1,250,000 read | 1,250,000 written | 63,798 entries/sec | 19.593 secs |   53,969,987 bytes | 2754554.535 byte/sec
    ```
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59579043
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    +      }
    +    }
    +
    +    public void report() {
    +      long sum = permitsAcquired.sumThenReset();
    +      if (sum > 0) {
    +        logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
    --- End diff --
    
    The timer will not always call report at the requested frequency.   Could track the last report time and use that to calculate the rate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ctubbsii <gi...@git.apache.org>.
Github user ctubbsii commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-210122789
  
    @ShawnWalker That sounds great! If you can separate out the issue with ShellServerIT as a separate issue, that'd be helpful. I'm guessing it affects older branches, as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ctubbsii <gi...@git.apache.org>.
Github user ctubbsii commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59597441
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    > Somehow get the RateLimiterFactory directly from the config? That'd be a stop-gap.
    
    @joshelser Not sure exactly what you mean, but what I meant was along the lines of creating a "FileOptions" (or similar) object to pass around instead of the AccumuloConfiguration object which is narrowly applicable to the context of those utility methods (context: operations on files?), and which is composed of the relevant configs from wherever they are sourced.
    
    @ShawnWalker I would consider it an optional improvement to this pull request. If the code modified here can be clearly demarcated as a distinct context, it may benefit from this kind of improvement as an alternative to adding the extra parameter everywhere. It can also be done at some future point in time, as a more general improvement (and a much bigger one... as it would require identifying multiple distinct contexts instead of possibly doing one at a time).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490202
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java ---
    @@ -0,0 +1,27 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +public interface RateLimiter {
    +  /**
    +   * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
    --- End diff --
    
    "QPS"? Essentially bytes per second in our compaction limiting scope?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490681
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    +      }
    +    }
    +
    +    public void report() {
    +      long sum = permitsAcquired.sumThenReset();
    +      if (sum > 0) {
    +        logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
    --- End diff --
    
    Wrap this in a `logger.ifDebugIsEnabled()` conditional, please. Typically, Logger instances are named `log` not `logger`. You could also do the string formatting "natively" with SLF4J's `{}` replacement syntax.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59565511
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.hadoop.fs.Seekable;
    +
    +/** A decorator for an {@code InputStream} which limits the rate at which reads are performed. */
    --- End diff --
    
    No no, I meant add line returns so it's:
    ```
    /**
     * A decorator for an {@code InputStream} which limits the rate at which reads are performed.
     */
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59566722
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    --- End diff --
    
    Indeed, this is the entire purpose of calling the `update()` method on each rate limiter on a timer.  The only reason that `SharedRateLimiterFactory` even touches `AccumuloConfiguration` is so that it can grab hold of an instance of `SimpleTimer`.  Moving this polling to its own `Timer` object will make `SharedRateLimiterFactory` a self-contained singleton.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59585544
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +/** Rate limiter from the Guava library. */
    +public class GuavaRateLimiter implements RateLimiter {
    +  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
    +  private long currentRate;
    +
    +  public GuavaRateLimiter(long initialRate) {
    +    this.currentRate = initialRate;
    --- End diff --
    
    The docs for `tserver.compaction.major.throughput` specify using 0 for unlimited.  Is specifying 0 or negative documented elsewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59578436
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    --- End diff --
    
    I was unaware of LongAdder... its really neat.  Yet another nice thing in JDK8 we can't use yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489612
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java ---
    @@ -14,24 +14,22 @@
      * License for the specific language governing permissions and limitations under
      * the License.
      */
    -
    -package org.apache.accumulo.core.file.rfile.bcfile;
    +package org.apache.accumulo.core.file.streams;
     
     import java.io.IOException;
     import java.io.InputStream;
     import java.security.AccessController;
     import java.security.PrivilegedActionException;
     import java.security.PrivilegedExceptionAction;
     
    -import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.Seekable;
    --- End diff --
    
    Seekable is public/evolving which means it's probably OK for us to expect to be stable, but mentioning to make sure we consciously acknowledge that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ctubbsii <gi...@git.apache.org>.
Github user ctubbsii commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211579493
  
    Looks like there's a few trivial findbugs issues to address in the IT.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59564578
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.hadoop.fs.Seekable;
    +
    +/** A decorator for an {@code InputStream} which limits the rate at which reads are performed. */
    --- End diff --
    
    Any suggestions?  I'm uncertain what more to say.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59568240
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    +      }
    +    }
    +
    +    public void report() {
    +      long sum = permitsAcquired.sumThenReset();
    +      if (sum > 0) {
    +        logger.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
    --- End diff --
    
    Can the logger configuration be changed at runtime?  It might make sense simply not to schedule the reporting task if debug logging is disabled at startup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ctubbsii <gi...@git.apache.org>.
Github user ctubbsii commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59571367
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    So, I've had some thoughts on this as well. A lot of our internal passing around of AccumuloConfiguration makes it easy to lose track of where the configuration object came from, making it harder to ensure we're not using it outside the intended scope (for example, if it represents a TableConfiguration).
    
    In many parts of our internal code, I would prefer we grab what we need from AccumuloConfiguration, combine it with any additional configuration appropriate for that specific context, and create a new context-specific configuration object to pass around within that context.
    
    If we were to do something like that, what would the context granularity be here? "file operations options", "file reading options", something else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211550045
  
    > With Keith's help, I've added a small end-to-end IT on rate limiting of major compactions.
    
    Looks good. Great work guys! :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490427
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    --- End diff --
    
    Why not just `public static synchronized`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59578033
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +/** Rate limiter from the Guava library. */
    +public class GuavaRateLimiter implements RateLimiter {
    +  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
    +  private long currentRate;
    +
    +  public GuavaRateLimiter(long initialRate) {
    +    this.currentRate = initialRate;
    --- End diff --
    
    should there be a sanity check here to ensure non negative?  or are there sufficient checks elsewhere?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59586564
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +/** Rate limiter from the Guava library. */
    +public class GuavaRateLimiter implements RateLimiter {
    +  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
    +  private long currentRate;
    +
    +  public GuavaRateLimiter(long initialRate) {
    +    this.currentRate = initialRate;
    --- End diff --
    
    Looking into the validation associated with, PropertyType.MEMORY it seems to check for >=0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59582717
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          throw new UnsupportedOperationException("Underlying stream does not support position()");
    +        }
    +      };
    +    }
    +  }
    +
    +  public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
    --- End diff --
    
    Wouldn't work.  The caller would have to explicitly specify what `T` is (since the compiler can't determine it), and `foo` would be obliged to return a `T`.
    
    It might work to make `PositionedOutputStream` private.  Then nobody could actually do anything more with it than make use of the fact that it extends `OutputStream` and implements `PositionedOutput`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59564604
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          throw new UnsupportedOperationException("Underlying stream does not support position()");
    +        }
    +      };
    +    }
    +  }
    +
    +  public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
    --- End diff --
    
    Oh, right you are. I thought callers of `PositionedOutputs.wrap` would be using it, but apparently not. As long as `PositionedOutputs.wrap` is returning `PositionedOutputStream`, I think this should be separated. If you want to change `PositionedOutputs.wrap` to just return a `FilterOutputStream`, we can make this `private static abstract ...`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489093
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/conf/Property.java ---
    @@ -293,6 +293,8 @@
           "The maximum number of concurrent tablet migrations for a tablet server"),
       TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
           "The maximum number of concurrent major compactions for a tablet server"),
    +  TSERV_MAJC_THROUGHPUT("tserver.compaction.major.throughput", "0B", PropertyType.MEMORY,
    +      "Maximum number of bytes to read or write per second over all major compactions, or 0 for unlimited."),
    --- End diff --
    
    ".... in a TabletServer." Clarify that this is not global, but local to a tserver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59563956
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          throw new UnsupportedOperationException("Underlying stream does not support position()");
    +        }
    +      };
    +    }
    +  }
    +
    +  public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
    --- End diff --
    
    I don't believe it is, in fact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59565074
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java ---
    @@ -0,0 +1,27 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +public interface RateLimiter {
    +  /**
    +   * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
    --- End diff --
    
    Yes.  I chose to say QPS (queries per second) here as it is the terminology used by `com.google.common.util.concurrent.RateLimiter` to describe its functionality.  Things other than byte counts might potentially wish to be rate limited.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59579811
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    --- End diff --
    
    why log debug here?  Not advocating for another level, just curious why this level was chosen. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59596175
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    > It sounds like you're advocating a more general refactoring of AccumuloConfiguration handling.
    
    Well, it's relevant in that without some change, you have `, null` sprinkled everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489844
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterInputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.hadoop.fs.Seekable;
    +
    +/** A decorator for an {@code InputStream} which limits the rate at which reads are performed. */
    --- End diff --
    
    Expand the javadoc please (style-nit)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59581868
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    --- End diff --
    
    # "This should never happen."  
    # It's not a tragedy if it does happen.
    
    Suggestions?
    
    I suppose I could introduce a new interface (e.g. `SharedRateLimiterFactory.RateProvider`) instead of just using `Callable<Long>`.  By making `RateProvider`'s method not declare any checked exceptions, we could avoid the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59581242
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +/** Rate limiter from the Guava library. */
    +public class GuavaRateLimiter implements RateLimiter {
    +  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
    +  private long currentRate;
    +
    +  public GuavaRateLimiter(long initialRate) {
    +    this.currentRate = initialRate;
    --- End diff --
    
    I'm adopting the convention that a non-positive rate should mean "unlimited", and so allowing non-positive values as the current rate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-210089684
  
    I've made changes to address most of the comments on this thread.  I've also addressed a performance concern that Keith Turner noticed (`PositionedOutputs.PositionedOutputStream` was behaving poorly).
    
    I've additionally fixed an issue with tracing in `TabletServerBatchWriter` that was causing the test `ShellServerIT.trace(...)` to fail for me for reasons unrelated to my changes.  Perhaps I should separate that out as a separate issue/patch?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/accumulo/pull/90


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490490
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    --- End diff --
    
    Can we re-poll the configuration and reschedule the task to make the implementation responsive to dynamic configuration update? Having to restart the server to get a new configuration value sucks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211546144
  
    With Keith's help, I've added a small end-to-end IT on rate limiting of major compactions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-209218793
  
    Made a first pass through the code. Wow! Great work for a first contribution @ShawnWalker! Some general themes:
    
    * nit-picky stylistic things
    * Missing javadoc on public classes/methods
    
    Some new tests on these new classes (testing the rate limiting components and input/output streams should be really important) would really make this even better.
    
    I'll have to go back to reread about the use of `<T extends Class & Interface>` littered everywhere with a fresh mind. First time I've run across it and I don't think I entirely grokked the point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489739
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    --- End diff --
    
    check for non-null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489718
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    --- End diff --
    
    Missing javadoc on class and methods. If class shouldn't be instantiated, make a private constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490576
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    --- End diff --
    
    Some simple javadoc on the below methods/class would be nice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211409828
  
    I've reverse committed changes to `TabletServerBatchWriter` (which were moved to ACCUMULO-4191), and then squashed the changeset to a single commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59586665
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.util.ratelimit;
    +
    +/** Rate limiter from the Guava library. */
    +public class GuavaRateLimiter implements RateLimiter {
    +  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
    +  private long currentRate;
    +
    +  public GuavaRateLimiter(long initialRate) {
    +    this.currentRate = initialRate;
    --- End diff --
    
    There's a hint in the javadocs for `RateLimiter.getRate()`:
    ```java
    /** Get current QPS of the rate limiter, with a nonpositive rate indicating no limit. */
    public long getRate();
    ```
    But I'll add more explicit comments to `GuavaRateLimiter` and to `SharedRateLimiterFactory`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59598295
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    > Not sure exactly what you mean, but what I meant was along the lines of creating a "FileOptions" (or similar) object to pass around instead of the AccumuloConfiguration object which is narrowly applicable to the context of those utility methods (context: operations on files?), and which is composed of the relevant configs from wherever they are sourced.
    
    Something like:
    ```
    T impl = acuConf.getImpl(Property.MY_FACTORY_IMPLEMENTATION);
    ```
    Concretely:
    ```
    RATE_LIMITER_FACTORY("tserver.rate.limiter.factory", SharedRateLimiterFactory.class, PropertyType.IMPL, "...")
    ```
    ```
    RateLimiterFactory factory = acuConf.getImpl(Property.RATE_LIMITER_FACTORY);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490771
  
    --- Diff: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---
    @@ -3089,4 +3093,20 @@ public void removeBulkImportState(List<String> files) {
         bulkImportStatus.removeBulkImportStatus(files);
       }
     
    +  private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read";
    +  private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write";
    +  private final Callable<Long> rateProvider = new Callable<Long>() {
    +    @Override
    +    public Long call() throws Exception {
    +      return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT);
    +    }
    +  };
    +
    +  public final RateLimiter getMajorCompactionReadLimiter() {
    --- End diff --
    
    Javadoc, please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211419757
  
    >  I suppose the test would make sure a compaction doesn't run too fast?
    
    Yeah, I was thinking about how best to test this. You can only reasonably assert a lower-bound on compaction time (to avoid performance skew on certain hosts). Maybe turning off compression for a table, writing a bunch of data and then asserting that a compaction takes at least X time is easiest. You'll still have to account for "compression" from the run-length encoding, but at least that should be uniform across hosts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490394
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    --- End diff --
    
    Reusing the SimpleTimer singleton instance might have unexpected implications on the rest of the server. I think having a Timer instance specifically for rate limiting would be good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211414852
  
    I think everything I asked about has been taken care of. Thanks for that, Shawn.
    
    The only thing I don't see (and I didn't say it explicitly earlier, so I don't think it's a blocker to merge this in) is a high-level test. I see you added some tests for the rate-limiter piece. I'm wondering if we could make an integration test specifically to test this feature. It's nice when we have a general test class (built around a minicluster) available so that we can easily test potential bugs and add new tests easily.
    
    @keith-turner LMK if you have time to merge this in and run the tests. Otherwise, I'll kick off something myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211402498
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211418879
  
    @joshelser I can merge it.  An end-to-end test would be nice to detect regressions.   I suppose the test would make sure a compaction doesn't run too fast?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489957
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with this
    + * work for additional information regarding copyright ownership. The ASF
    + * licenses this file to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    + * License for the specific language governing permissions and limitations under
    + * the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +
    +/** A decorator for {@code OutputStream} which limits the rate at which data may be written. */
    --- End diff --
    
    Expand the comment (style-nit)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59579078
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    > In many parts of our internal code, I would prefer we grab what we need from AccumuloConfiguration, combine it with any additional configuration appropriate for that specific context, and create a new context-specific configuration object to pass around within that context.
    
    Somehow get the RateLimiterFactory directly from the config? That'd be a stop-gap.
    
    > I was thinking perhaps a fluent syntax for opening readers/writers would make sense.
    > That said, introducing such a change seemed a bit ambitious.
    
    Yeah, I totally understand. Finding some middle ground to avoid lots of unrelated changes to your actual feature is ideal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59571157
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    I was thinking perhaps a fluent syntax for opening readers/writers would make sense.
    
    E.g. so that it admits syntax like the following:
    ```java
    FileSKVIterator iterator=fileOperations.openReader(accumuloConfiguration)
        .ofFile(filename, fileSystem, fsConfiguration)
        .withRateLimiter(rateLimiter) // optional
        .withCache(dataCache, indexCache) // optional
        .forScanning(range, columnFamilies, inclusive);
    ```
    
    Unfortunately, introducing such a change seemed a bit much for the scope of what I wanted to accomplish.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59589138
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    To @ctubbsii's comment:  Sounds interesting, but is it relevant to my pull request?  It sounds like you're advocating a more general refactoring of `AccumuloConfiguration` handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by ShawnWalker <gi...@git.apache.org>.
Github user ShawnWalker commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59579822
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          throw new UnsupportedOperationException("Underlying stream does not support position()");
    +        }
    +      };
    +    }
    +  }
    +
    +  public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
    --- End diff --
    
    What I really want to say is "This method returns an `OutputStream` which implements `PositionedOutput`"  But there's not really a way to say that in Java.
    
    I'm probably just being too clever with `PositionedOutput` anyways.  The underlying issue is that `BCFile.Writer` needs an `OutputStream` implementing `DataOutput` with some facsimile of the `getPos()` method from `FSDataOutputStream`.  `RateLimitedOutputStream` doesn't want to care about or interfere with either of these.  But there doesn't seem to be any way to separate concerns cleanly.  `getPos()` doesn't have an interface (like `Seekable`), and `java.io.DataOutputStream` (and so `FSDataOutputStream`) isn't implemented in a way which readily supports subclassing/inheritance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59584225
  
    --- Diff: server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    --- End diff --
    
    OIC its because of the Callable, no suggestions this is fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489344
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java ---
    @@ -139,11 +146,12 @@ public long getStartPos() throws IOException {
        *
        */
       public static class Reader implements BlockFileReader {
    +    private final RateLimiter readLimiter;
         private BCFile.Reader _bc;
         private String fileName = "not_available";
         private BlockCache _dCache = null;
         private BlockCache _iCache = null;
    -    private FSDataInputStream fin = null;
    +    private Closeable fin = null;
    --- End diff --
    
    Any reason for `Closeable` instead of `InputStream`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489818
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.core.file.streams;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +
    +public class PositionedOutputs {
    +  public static PositionedOutputStream wrap(final OutputStream fout) {
    +    if (fout instanceof FSDataOutputStream) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((FSDataOutputStream) fout).getPos();
    +        }
    +      };
    +    } else if (fout instanceof PositionedOutput) {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          return ((PositionedOutput) fout).position();
    +        }
    +      };
    +    } else {
    +      return new PositionedOutputStream(fout) {
    +        @Override
    +        public long position() throws IOException {
    +          throw new UnsupportedOperationException("Underlying stream does not support position()");
    +        }
    +      };
    +    }
    +  }
    +
    +  public static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
    --- End diff --
    
    Would prefer to see this its own file since it's used outside of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59489073
  
    --- Diff: core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---
    @@ -55,23 +56,24 @@ public static FileOperations getInstance() {
        */
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
     
       public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
    -      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
    +      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
     
       /**
        * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
        *
        */
     
    -  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
    -      throws IOException;
    +  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
    --- End diff --
    
    I wonder if it wouldn't be a good idea to consolidate the various "Accumulo" configuration objects into one struct-like object. Presently, that would contain the AccumuloConfiguration and the new RateLimiter. This would prevent us from having to change the method signature every time something new is added here (so far, there's a lot of modifications from this already)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] accumulo pull request: ACCUMULO-4187: Added rate limiting for majo...

Posted by keith-turner <gi...@git.apache.org>.
Github user keith-turner commented on the pull request:

    https://github.com/apache/accumulo/pull/90#issuecomment-211398304
  
    @ShawnWalker I suspect the issue you fixed that was causing ShellServerIT to fail was introduced by ACCUMULO-1755


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---