You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@accumulo.apache.org by Ryan Cunningham <ry...@gmail.com> on 2016/08/12 22:02:03 UTC

Accumulo Limiting Iterator Help

Hello,



I'm trying to write an iterator that gets the top N sorted entries for a
given range over sharded data. I created a custom iterator that extends
SkippingIterator and made it so that it will return the first N entries for
each tablet. After N entries, I have the source iterator seek to the end
key of the specific range since it shouldn't return any other entries for
that tablet.



@Override

  public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {

    super.init(source, options, env);

    String o = options.get(NUM_SCANS_STRING_NAME);

    numScans = o == null ? 10 : Integer.parseInt(o);

    String n = options.get(NUM_ENTRIES_STRING_NAME);

    numEntriesPerRange = n == null ? Integer.MAX_VALUE :
Integer.parseInt(n);

    numEntries = 0;

  }



  // this is only ever called immediately after getting "next" entry

  @Override

  protected void consume() throws IOException {

    if (numEntries < numEntriesPerRange) {

               ++numEntries;

               return;

    }

    int count = 0;

    while (getSource().hasTop()) {

               if (count < numScans) {

                              ++count;

            getSource().next(); // scan

        } else {

            // too many scans, just seek to end of range

               Key lastKey = latestRange.getEndKey() == null ? new Key(new
Text(String.valueOf(Character.MAX_VALUE))) :
latestRange.getEndKey().followingKey(PartialKey.ROW);

                   getSource().seek(new Range(lastKey, true, lastKey,
false), latestColumnFamilies, latestInclusive);

        }

    }

  }



  @Override

  public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {

    // save parameters for future internal seeks

    latestRange = range;

    latestColumnFamilies = columnFamilies;

    latestInclusive = inclusive;



    super.seek(range, columnFamilies, inclusive);



    if (getSource().hasTop()) {

      if (range.beforeStartKey(getSource().getTopKey()))

        consume();

    }

  }



I did some initial testing and it seems to work as expected, bringing back
N * number of tablets results. However, when I increase the limit past a
certain point something seems to be messing up and I get all entries back
instead of the limited count. I also sometimes see this error but I looked
online and I'm not sure if it's related:



16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
stream.

java.io.IOException: The stream is closed

        at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:118)

        at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)

        at java.io.FilterOutputStream.close(FilterOutputStream.java:158)

        at
org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)

        at
org.apache.thrift.transport.TFramedTransport.close(TFramedTransport.java:89)

        at
org.apache.accumulo.core.client.impl.ThriftTransportPool$CachedTTransport.close(ThriftTransportPool.java:312)

        at
org.apache.accumulo.core.client.impl.ThriftTransportPool.returnTransport(ThriftTransportPool.java:584)

        at
org.apache.accumulo.core.util.ThriftUtil.returnClient(ThriftUtil.java:134)

        at
org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator.doLookup(TabletServerBatchReaderIterator.java:714)

        at
org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)

        at
org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at
org.apache.accumulo.trace.instrument.TraceRunnable.run(TraceRunnable.java:47)

        at
org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRunnable.java:34)

        at java.lang.Thread.run(Thread.java:745)



Does anyone have any idea why the iterator would work for lower values of N
but not higher ones? Also, I don’t have a lot of experience with iterators
and am not confident that the seek in consume() is right. What is the best
way to skip the rest of a range in an iterator? Or is this not feasible?



Any help would be greatly appreciated!

Thanks,

Ryan

Re: Accumulo Limiting Iterator Help

Posted by Ryan Cunningham <ry...@gmail.com>.
Oh ok, that is much simpler. Still wrapping my head around iterators.

Good point about the re-init of the iterator. I think in my case it was
that the iterator was filling up the batch before hitting the limit and
then creating a new instance starting back at 0. I was able to work around
this by increasing the table setting 'table.scan.max.memory', which seemed
to increase the batch size, and the iterator worked successfully for the
higher limits. It doesn't seem that this iterator will work, especially if
there are other conditions that can cause a new iterator instance to be
created. I guess I can see why iterator logic that uses the range is
discouraged haha.

Thanks for the help Dylan!

On Fri, Aug 12, 2016 at 8:39 PM, Dylan Hutchison <dhutchis@cs.washington.edu
> wrote:

> Hi Ryan,
>
> I think you could achieve the behavior you described more simply by
> overriding hasTop() and returning `false` once your iterator has seen and
> emitted N entries.  No need to re-seek the parent iterator to a singleton
> range, since that will have the same effect as hasTop() == false.  Also,
> you're not guaranteed that the singleton range is valid, if the seek range
> has an exclusive end key.
>
> I couldn't follow the meaning behind the `numEntriesPerRange` and
> `numScans` variables, but hopefully the above advice helps.  I'm also not
> sure about the IOException.
>
> Keep in mind that Accumulo can take down your iterator and re-create it at
> any point.  When it does so, it re-inits and then re-seeks your iterator to
> a position immediately after the last key returned.  If this happens in the
> middle of your iterator counting N entries then it will start counting
> again from 0.  See the iterator design section
> <https://accumulo.apache.org/1.7/accumulo_user_manual#_iterator_design>
> in the manual for more info on when this happens.
>
> Cheers, Dylan
>
> On Fri, Aug 12, 2016 at 3:02 PM, Ryan Cunningham <ry...@gmail.com>
> wrote:
>
>> Hello,
>>
>>
>>
>> I'm trying to write an iterator that gets the top N sorted entries for a
>> given range over sharded data. I created a custom iterator that extends
>> SkippingIterator and made it so that it will return the first N entries for
>> each tablet. After N entries, I have the source iterator seek to the end
>> key of the specific range since it shouldn't return any other entries for
>> that tablet.
>>
>>
>>
>> @Override
>>
>>   public void init(SortedKeyValueIterator<Key,Value> source,
>> Map<String,String> options, IteratorEnvironment env) throws IOException {
>>
>>     super.init(source, options, env);
>>
>>     String o = options.get(NUM_SCANS_STRING_NAME);
>>
>>     numScans = o == null ? 10 : Integer.parseInt(o);
>>
>>     String n = options.get(NUM_ENTRIES_STRING_NAME);
>>
>>     numEntriesPerRange = n == null ? Integer.MAX_VALUE :
>> Integer.parseInt(n);
>>
>>     numEntries = 0;
>>
>>   }
>>
>>
>>
>>   // this is only ever called immediately after getting "next" entry
>>
>>   @Override
>>
>>   protected void consume() throws IOException {
>>
>>     if (numEntries < numEntriesPerRange) {
>>
>>                ++numEntries;
>>
>>                return;
>>
>>     }
>>
>>     int count = 0;
>>
>>     while (getSource().hasTop()) {
>>
>>                if (count < numScans) {
>>
>>                               ++count;
>>
>>             getSource().next(); // scan
>>
>>         } else {
>>
>>             // too many scans, just seek to end of range
>>
>>                Key lastKey = latestRange.getEndKey() == null ? new
>> Key(new Text(String.valueOf(Character.MAX_VALUE))) :
>> latestRange.getEndKey().followingKey(PartialKey.ROW);
>>
>>                    getSource().seek(new Range(lastKey, true, lastKey,
>> false), latestColumnFamilies, latestInclusive);
>>
>>         }
>>
>>     }
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public void seek(Range range, Collection<ByteSequence> columnFamilies,
>> boolean inclusive) throws IOException {
>>
>>     // save parameters for future internal seeks
>>
>>     latestRange = range;
>>
>>     latestColumnFamilies = columnFamilies;
>>
>>     latestInclusive = inclusive;
>>
>>
>>
>>     super.seek(range, columnFamilies, inclusive);
>>
>>
>>
>>     if (getSource().hasTop()) {
>>
>>       if (range.beforeStartKey(getSource().getTopKey()))
>>
>>         consume();
>>
>>     }
>>
>>   }
>>
>>
>>
>> I did some initial testing and it seems to work as expected, bringing
>> back N * number of tablets results. However, when I increase the limit past
>> a certain point something seems to be messing up and I get all entries back
>> instead of the limited count. I also sometimes see this error but I looked
>> online and I'm not sure if it's related:
>>
>>
>>
>> 16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
>> stream.
>>
>> java.io.IOException: The stream is closed
>>
>>         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputS
>> tream.java:118)
>>
>>         at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStrea
>> m.java:82)
>>
>>         at java.io.BufferedOutputStream.flush(BufferedOutputStream.java
>> :140)
>>
>>         at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>>
>>         at org.apache.thrift.transport.TIOStreamTransport.close(TIOStre
>> amTransport.java:110)
>>
>>         at org.apache.thrift.transport.TFramedTransport.close(TFramedTr
>> ansport.java:89)
>>
>>         at org.apache.accumulo.core.client.impl.ThriftTransportPool$Cac
>> hedTTransport.close(ThriftTransportPool.java:312)
>>
>>         at org.apache.accumulo.core.client.impl.ThriftTransportPool.ret
>> urnTransport(ThriftTransportPool.java:584)
>>
>>         at org.apache.accumulo.core.util.ThriftUtil.returnClient(Thrift
>> Util.java:134)
>>
>>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
>> Iterator.doLookup(TabletServerBatchReaderIterator.java:714)
>>
>>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
>> Iterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)
>>
>>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
>> Runnable.java:47)
>>
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>
>>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
>> Runnable.java:47)
>>
>>         at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRun
>> nable.java:34)
>>
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Does anyone have any idea why the iterator would work for lower values of
>> N but not higher ones? Also, I don’t have a lot of experience with
>> iterators and am not confident that the seek in consume() is right. What is
>> the best way to skip the rest of a range in an iterator? Or is this not
>> feasible?
>>
>>
>>
>> Any help would be greatly appreciated!
>>
>> Thanks,
>>
>> Ryan
>>
>
>

Re: Accumulo Limiting Iterator Help

Posted by Dylan Hutchison <dh...@cs.washington.edu>.
Hi Ryan,

I think you could achieve the behavior you described more simply by
overriding hasTop() and returning `false` once your iterator has seen and
emitted N entries.  No need to re-seek the parent iterator to a singleton
range, since that will have the same effect as hasTop() == false.  Also,
you're not guaranteed that the singleton range is valid, if the seek range
has an exclusive end key.

I couldn't follow the meaning behind the `numEntriesPerRange` and
`numScans` variables, but hopefully the above advice helps.  I'm also not
sure about the IOException.

Keep in mind that Accumulo can take down your iterator and re-create it at
any point.  When it does so, it re-inits and then re-seeks your iterator to
a position immediately after the last key returned.  If this happens in the
middle of your iterator counting N entries then it will start counting
again from 0.  See the iterator design section
<https://accumulo.apache.org/1.7/accumulo_user_manual#_iterator_design> in
the manual for more info on when this happens.

Cheers, Dylan

On Fri, Aug 12, 2016 at 3:02 PM, Ryan Cunningham <ry...@gmail.com>
wrote:

> Hello,
>
>
>
> I'm trying to write an iterator that gets the top N sorted entries for a
> given range over sharded data. I created a custom iterator that extends
> SkippingIterator and made it so that it will return the first N entries for
> each tablet. After N entries, I have the source iterator seek to the end
> key of the specific range since it shouldn't return any other entries for
> that tablet.
>
>
>
> @Override
>
>   public void init(SortedKeyValueIterator<Key,Value> source,
> Map<String,String> options, IteratorEnvironment env) throws IOException {
>
>     super.init(source, options, env);
>
>     String o = options.get(NUM_SCANS_STRING_NAME);
>
>     numScans = o == null ? 10 : Integer.parseInt(o);
>
>     String n = options.get(NUM_ENTRIES_STRING_NAME);
>
>     numEntriesPerRange = n == null ? Integer.MAX_VALUE :
> Integer.parseInt(n);
>
>     numEntries = 0;
>
>   }
>
>
>
>   // this is only ever called immediately after getting "next" entry
>
>   @Override
>
>   protected void consume() throws IOException {
>
>     if (numEntries < numEntriesPerRange) {
>
>                ++numEntries;
>
>                return;
>
>     }
>
>     int count = 0;
>
>     while (getSource().hasTop()) {
>
>                if (count < numScans) {
>
>                               ++count;
>
>             getSource().next(); // scan
>
>         } else {
>
>             // too many scans, just seek to end of range
>
>                Key lastKey = latestRange.getEndKey() == null ? new Key(new
> Text(String.valueOf(Character.MAX_VALUE))) :
> latestRange.getEndKey().followingKey(PartialKey.ROW);
>
>                    getSource().seek(new Range(lastKey, true, lastKey,
> false), latestColumnFamilies, latestInclusive);
>
>         }
>
>     }
>
>   }
>
>
>
>   @Override
>
>   public void seek(Range range, Collection<ByteSequence> columnFamilies,
> boolean inclusive) throws IOException {
>
>     // save parameters for future internal seeks
>
>     latestRange = range;
>
>     latestColumnFamilies = columnFamilies;
>
>     latestInclusive = inclusive;
>
>
>
>     super.seek(range, columnFamilies, inclusive);
>
>
>
>     if (getSource().hasTop()) {
>
>       if (range.beforeStartKey(getSource().getTopKey()))
>
>         consume();
>
>     }
>
>   }
>
>
>
> I did some initial testing and it seems to work as expected, bringing back
> N * number of tablets results. However, when I increase the limit past a
> certain point something seems to be messing up and I get all entries back
> instead of the limited count. I also sometimes see this error but I looked
> online and I'm not sure if it's related:
>
>
>
> 16/08/12 20:54:22 WARN transport.TIOStreamTransport: Error closing output
> stream.
>
> java.io.IOException: The stream is closed
>
>         at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputS
> tream.java:118)
>
>         at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStrea
> m.java:82)
>
>         at java.io.BufferedOutputStream.flush(BufferedOutputStream.java
> :140)
>
>         at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
>
>         at org.apache.thrift.transport.TIOStreamTransport.close(TIOStre
> amTransport.java:110)
>
>         at org.apache.thrift.transport.TFramedTransport.close(TFramedTr
> ansport.java:89)
>
>         at org.apache.accumulo.core.client.impl.ThriftTransportPool$Cac
> hedTTransport.close(ThriftTransportPool.java:312)
>
>         at org.apache.accumulo.core.client.impl.ThriftTransportPool.ret
> urnTransport(ThriftTransportPool.java:584)
>
>         at org.apache.accumulo.core.util.ThriftUtil.returnClient(Thrift
> Util.java:134)
>
>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
> Iterator.doLookup(TabletServerBatchReaderIterator.java:714)
>
>         at org.apache.accumulo.core.client.impl.TabletServerBatchReader
> Iterator$QueryTask.run(TabletServerBatchReaderIterator.java:376)
>
>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
> Runnable.java:47)
>
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>
>         at org.apache.accumulo.trace.instrument.TraceRunnable.run(Trace
> Runnable.java:47)
>
>         at org.apache.accumulo.core.util.LoggingRunnable.run(LoggingRun
> nable.java:34)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
> Does anyone have any idea why the iterator would work for lower values of
> N but not higher ones? Also, I don’t have a lot of experience with
> iterators and am not confident that the seek in consume() is right. What is
> the best way to skip the rest of a range in an iterator? Or is this not
> feasible?
>
>
>
> Any help would be greatly appreciated!
>
> Thanks,
>
> Ryan
>