You are viewing a plain text version of this content. The canonical link for it is here.
Posted to codereview@trafodion.apache.org by eowhadi <gi...@git.apache.org> on 2016/04/11 19:35:06 UTC

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

GitHub user eowhadi opened a pull request:

    https://github.com/apache/incubator-trafodion/pull/422

    [TRAFODION-1421] Implement parallel scanner primitive

    new CQD: hbase_dop_parallel_scanner
    - default 0.0 = disabled
    - from 0.x to 1.0 -> the DOP will be dynamically assessed as a percentage of region to scan. If 10 region, and cqd is .3 (30%), then 3 threads will be used...
    - from 2.0 and above, will take ceiling, and use it as fixed number of threads.
    Advantage of parallel scanner parallelism over ESP is on resource consumption. Parallel scanner is just creating threads, while ESP are full proceses wth high memory consumption.
    For now, this feature is just a primitive that can be manually exercised. The next obvious step is to make the compiler aware of it and pick it over ESP DOP when appropriate...

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/eowhadi/incubator-trafodion parallelScan

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-trafodion/pull/422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #422
    
----
commit a7f9a628179fcd4077a566f558e653b1380eabc6
Author: Eric Owhadi <er...@esgyn.com>
Date:   2016-04-11T16:59:23Z

    [TRAFODION-1421] Implement parallel scanner primitive
    new CQD: hbase_dop_parallel_scanner
    - default 0.0 = disabled
    - from 0.x to 1.0 -> the DOP will be dynamically assessed as a percentage of region to scan. If 10 region, and cqd is .3 (30%), then 3 threads will be used...
    - from 2.0 and above, will take ceiling, and use it as fixed number of threads.
    Advantage of parallel scanner parallelism over ESP is on resource consumption. Parallel scanner is just creating threads, while ESP are full proceses wth high memory consumption.
    For now, this feature is just a primitive that can be manually exercised. The next obvious step is to make the compiler aware of it and pick it over ESP DOP when appropriate...

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by DaveBirdsall <gi...@git.apache.org>.
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59944770
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    --- End diff --
    
    Should the close method reset lastResult to null to aid in garbage collection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by eowhadi <gi...@git.apache.org>.
Github user eowhadi commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59947160
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    +    protected final long maxScannerResultSize;
    +    private final HConnection connection;
    +    private final TableName tableName;
    +    protected final int scannerTimeout;
    +    protected boolean scanMetricsPublished = false;
    +    protected RpcRetryingCaller<Result []> caller;
    +    protected RpcControllerFactory rpcControllerFactory;
    +
    +    /**
    +     * Create a new ClientScanner for the specified table. An HConnection will be
    +     * retrieved using the passed Configuration.
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @throws IOException
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final TableName tableName) throws IOException {
    +      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
    +    }
    +
    +    /**
    +     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final byte [] tableName) throws IOException {
    +      this(conf, scan, TableName.valueOf(tableName));
    +    }
    +
    +
    +    /**
    +     * Create a new ClientScanner for the specified table
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @param connection Connection identifying the cluster
    +     * @throws IOException
    +     */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final byte [] tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use
    +   *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
    +   *             RpcRetryingCallerFactory, RpcControllerFactory)}
    +   *             instead
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
    +    this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
    +   * row maybe changed changed.
    +   * @param conf The {@link Configuration} to use.
    +   * @param scan {@link Scan} to use in this scanner
    +   * @param tableName The table that we wish to scan
    +   * @param connection Connection identifying the cluster
    +   * @throws IOException
    +   */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory,
    +      RpcControllerFactory controllerFactory) throws IOException {
    +      if (LOG.isTraceEnabled()) {
    +        LOG.trace("Scan table=" + tableName
    +            + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
    +      } 
    +      this.scan = scan;
    +      this.tableName = tableName;
    +      this.lastNext = System.currentTimeMillis();
    +      this.connection = connection;
    +      if (scan.getMaxResultSize() > 0) {
    +        this.maxScannerResultSize = scan.getMaxResultSize();
    +      } else {
    +        this.maxScannerResultSize = conf.getLong(
    +          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
    +          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
    +      }
    +      this.scannerTimeout = HBaseConfiguration.getInt(conf,
    +        HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
    +        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
    +        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
    +
    +      // check if application wants to collect scan metrics
    +      initScanMetrics(scan);
    +
    +      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
    +      if (this.scan.getCaching() > 0) {
    +        this.caching = this.scan.getCaching();
    +      } else {
    +        this.caching = conf.getInt(
    +            HConstants.HBASE_CLIENT_SCANNER_CACHING,
    +            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
    +      }
    +
    +      this.caller = rpcFactory.<Result[]> newCaller();
    +      this.rpcControllerFactory = controllerFactory;
    +
    +      initializeScannerInConstruction();
    +    }
    +
    +    protected void initializeScannerInConstruction() throws IOException{
    +      // initialize the scanner
    +      nextScanner(this.caching, false);
    +    }
    +
    +    protected HConnection getConnection() {
    +      return this.connection;
    +    }
    +
    +    /**
    +     * @return Table name
    +     * @deprecated Since 0.96.0; use {@link #getTable()}
    +     */
    +    @Deprecated
    +    protected byte [] getTableName() {
    +      return this.tableName.getName();
    +    }
    +
    +    protected TableName getTable() {
    +      return this.tableName;
    +    }
    +
    +    protected Scan getScan() {
    +      return scan;
    +    }
    +
    +    protected long getTimestamp() {
    +      return lastNext;
    +    }
    +
    +    // returns true if the passed region endKey
    +    protected boolean checkScanStopRow(final byte [] endKey) {
    +      if (this.scan.getStopRow().length > 0) {
    +        // there is a stop row, check to see if we are past it.
    +        byte [] stopRow = scan.getStopRow();
    +        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
    +          endKey, 0, endKey.length);
    +        if (cmp <= 0) {
    +          // stopRow <= endKey (endKey is equals to or larger than stopRow)
    +          // This is a stop.
    +          return true;
    +        }
    +      }
    +      return false; //unlikely.
    --- End diff --
    
    I did not try to understand or alter the ClientScanner code. So I don't know about this comment. This code comes from earlier version of ClientScanner Hbase, just renamed 98 and adding missing overrides to comply with newer abstract class in hbase 1.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by eowhadi <gi...@git.apache.org>.
Github user eowhadi commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59946858
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    --- End diff --
    
    the whole clientScanner98 will be de-referenced, to GC will get rid of lastResult as it is a member of Clientscanner98. This code is quazi copy past from existing client scanner code. So would like to minimize changes to strick minimum (make sure it does compile with newer hbase version...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by DaveBirdsall <gi...@git.apache.org>.
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59945379
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    +    protected final long maxScannerResultSize;
    +    private final HConnection connection;
    +    private final TableName tableName;
    +    protected final int scannerTimeout;
    +    protected boolean scanMetricsPublished = false;
    +    protected RpcRetryingCaller<Result []> caller;
    +    protected RpcControllerFactory rpcControllerFactory;
    +
    +    /**
    +     * Create a new ClientScanner for the specified table. An HConnection will be
    +     * retrieved using the passed Configuration.
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @throws IOException
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final TableName tableName) throws IOException {
    +      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
    +    }
    +
    +    /**
    +     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final byte [] tableName) throws IOException {
    +      this(conf, scan, TableName.valueOf(tableName));
    +    }
    +
    +
    +    /**
    +     * Create a new ClientScanner for the specified table
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @param connection Connection identifying the cluster
    +     * @throws IOException
    +     */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final byte [] tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use
    +   *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
    +   *             RpcRetryingCallerFactory, RpcControllerFactory)}
    +   *             instead
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
    +    this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
    +   * row maybe changed changed.
    +   * @param conf The {@link Configuration} to use.
    +   * @param scan {@link Scan} to use in this scanner
    +   * @param tableName The table that we wish to scan
    +   * @param connection Connection identifying the cluster
    +   * @throws IOException
    +   */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory,
    +      RpcControllerFactory controllerFactory) throws IOException {
    +      if (LOG.isTraceEnabled()) {
    +        LOG.trace("Scan table=" + tableName
    +            + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
    +      } 
    +      this.scan = scan;
    +      this.tableName = tableName;
    +      this.lastNext = System.currentTimeMillis();
    +      this.connection = connection;
    +      if (scan.getMaxResultSize() > 0) {
    +        this.maxScannerResultSize = scan.getMaxResultSize();
    +      } else {
    +        this.maxScannerResultSize = conf.getLong(
    +          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
    +          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
    +      }
    +      this.scannerTimeout = HBaseConfiguration.getInt(conf,
    +        HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
    +        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
    +        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
    +
    +      // check if application wants to collect scan metrics
    +      initScanMetrics(scan);
    +
    +      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
    +      if (this.scan.getCaching() > 0) {
    +        this.caching = this.scan.getCaching();
    +      } else {
    +        this.caching = conf.getInt(
    +            HConstants.HBASE_CLIENT_SCANNER_CACHING,
    +            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
    +      }
    +
    +      this.caller = rpcFactory.<Result[]> newCaller();
    +      this.rpcControllerFactory = controllerFactory;
    +
    +      initializeScannerInConstruction();
    +    }
    +
    +    protected void initializeScannerInConstruction() throws IOException{
    +      // initialize the scanner
    +      nextScanner(this.caching, false);
    +    }
    +
    +    protected HConnection getConnection() {
    +      return this.connection;
    +    }
    +
    +    /**
    +     * @return Table name
    +     * @deprecated Since 0.96.0; use {@link #getTable()}
    +     */
    +    @Deprecated
    +    protected byte [] getTableName() {
    +      return this.tableName.getName();
    +    }
    +
    +    protected TableName getTable() {
    +      return this.tableName;
    +    }
    +
    +    protected Scan getScan() {
    +      return scan;
    +    }
    +
    +    protected long getTimestamp() {
    +      return lastNext;
    +    }
    +
    +    // returns true if the passed region endKey
    +    protected boolean checkScanStopRow(final byte [] endKey) {
    +      if (this.scan.getStopRow().length > 0) {
    +        // there is a stop row, check to see if we are past it.
    +        byte [] stopRow = scan.getStopRow();
    +        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
    +          endKey, 0, endKey.length);
    +        if (cmp <= 0) {
    +          // stopRow <= endKey (endKey is equals to or larger than stopRow)
    +          // This is a stop.
    +          return true;
    +        }
    +      }
    +      return false; //unlikely.
    --- End diff --
    
    Why is false unlikely? Are we past the end key almost all the time? (Seems counter-intuitive.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by DaveBirdsall <gi...@git.apache.org>.
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59944350
  
    --- Diff: core/sqf/src/seatrans/.gitignore ---
    @@ -12,3 +12,6 @@ hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/Clean
     hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/KeyValueListScanner.java
     hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/MemoryUsageChore.java
     hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java
    +hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java
    --- End diff --
    
    Is the name ClientScanner98 significant? For example, does it work only on HBase 0.98? Is there a different implementation needed for HBase 1.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by DaveBirdsall <gi...@git.apache.org>.
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59944994
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    +    protected final long maxScannerResultSize;
    +    private final HConnection connection;
    +    private final TableName tableName;
    +    protected final int scannerTimeout;
    +    protected boolean scanMetricsPublished = false;
    +    protected RpcRetryingCaller<Result []> caller;
    +    protected RpcControllerFactory rpcControllerFactory;
    +
    +    /**
    +     * Create a new ClientScanner for the specified table. An HConnection will be
    +     * retrieved using the passed Configuration.
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @throws IOException
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final TableName tableName) throws IOException {
    +      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
    +    }
    +
    +    /**
    +     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final byte [] tableName) throws IOException {
    +      this(conf, scan, TableName.valueOf(tableName));
    +    }
    +
    +
    +    /**
    +     * Create a new ClientScanner for the specified table
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @param connection Connection identifying the cluster
    +     * @throws IOException
    +     */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final byte [] tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use
    +   *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
    +   *             RpcRetryingCallerFactory, RpcControllerFactory)}
    +   *             instead
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
    +    this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
    +   * row maybe changed changed.
    --- End diff --
    
    Typo in the comments ("changed changed")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-trafodion/pull/422


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by eowhadi <gi...@git.apache.org>.
Github user eowhadi commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59946150
  
    --- Diff: core/sqf/src/seatrans/.gitignore ---
    @@ -12,3 +12,6 @@ hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/Clean
     hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/KeyValueListScanner.java
     hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/MemoryUsageChore.java
     hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java
    +hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java
    --- End diff --
    
    In 1.0 (actually after the fact I discovered that early 1.0 also didn't have the feature), and even in a latter version of 98, hbase introduced replica RS. This feature makes the client scanner multithread and not working with parallel scanner. So I froze an old version of non multi thread scanner, that does not support replica to be used with parallel scanner. That is ClientScanner98. It works with our 1.x HBase version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...

Posted by DaveBirdsall <gi...@git.apache.org>.
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59947604
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    +    protected final long maxScannerResultSize;
    +    private final HConnection connection;
    +    private final TableName tableName;
    +    protected final int scannerTimeout;
    +    protected boolean scanMetricsPublished = false;
    +    protected RpcRetryingCaller<Result []> caller;
    +    protected RpcControllerFactory rpcControllerFactory;
    +
    +    /**
    +     * Create a new ClientScanner for the specified table. An HConnection will be
    +     * retrieved using the passed Configuration.
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @throws IOException
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final TableName tableName) throws IOException {
    +      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
    +    }
    +
    +    /**
    +     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final byte [] tableName) throws IOException {
    +      this(conf, scan, TableName.valueOf(tableName));
    +    }
    +
    +
    +    /**
    +     * Create a new ClientScanner for the specified table
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @param connection Connection identifying the cluster
    +     * @throws IOException
    +     */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final byte [] tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use
    +   *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
    +   *             RpcRetryingCallerFactory, RpcControllerFactory)}
    +   *             instead
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
    +    this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
    +   * row maybe changed changed.
    +   * @param conf The {@link Configuration} to use.
    +   * @param scan {@link Scan} to use in this scanner
    +   * @param tableName The table that we wish to scan
    +   * @param connection Connection identifying the cluster
    +   * @throws IOException
    +   */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory,
    +      RpcControllerFactory controllerFactory) throws IOException {
    +      if (LOG.isTraceEnabled()) {
    +        LOG.trace("Scan table=" + tableName
    +            + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
    +      } 
    +      this.scan = scan;
    +      this.tableName = tableName;
    +      this.lastNext = System.currentTimeMillis();
    +      this.connection = connection;
    +      if (scan.getMaxResultSize() > 0) {
    +        this.maxScannerResultSize = scan.getMaxResultSize();
    +      } else {
    +        this.maxScannerResultSize = conf.getLong(
    +          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
    +          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
    +      }
    +      this.scannerTimeout = HBaseConfiguration.getInt(conf,
    +        HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
    +        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
    +        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
    +
    +      // check if application wants to collect scan metrics
    +      initScanMetrics(scan);
    +
    +      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
    +      if (this.scan.getCaching() > 0) {
    +        this.caching = this.scan.getCaching();
    +      } else {
    +        this.caching = conf.getInt(
    +            HConstants.HBASE_CLIENT_SCANNER_CACHING,
    +            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
    +      }
    +
    +      this.caller = rpcFactory.<Result[]> newCaller();
    +      this.rpcControllerFactory = controllerFactory;
    +
    +      initializeScannerInConstruction();
    +    }
    +
    +    protected void initializeScannerInConstruction() throws IOException{
    +      // initialize the scanner
    +      nextScanner(this.caching, false);
    +    }
    +
    +    protected HConnection getConnection() {
    +      return this.connection;
    +    }
    +
    +    /**
    +     * @return Table name
    +     * @deprecated Since 0.96.0; use {@link #getTable()}
    +     */
    +    @Deprecated
    +    protected byte [] getTableName() {
    +      return this.tableName.getName();
    +    }
    +
    +    protected TableName getTable() {
    +      return this.tableName;
    +    }
    +
    +    protected Scan getScan() {
    +      return scan;
    +    }
    +
    +    protected long getTimestamp() {
    +      return lastNext;
    +    }
    +
    +    // returns true if the passed region endKey
    +    protected boolean checkScanStopRow(final byte [] endKey) {
    +      if (this.scan.getStopRow().length > 0) {
    +        // there is a stop row, check to see if we are past it.
    +        byte [] stopRow = scan.getStopRow();
    +        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
    +          endKey, 0, endKey.length);
    +        if (cmp <= 0) {
    +          // stopRow <= endKey (endKey is equals to or larger than stopRow)
    +          // This is a stop.
    +          return true;
    +        }
    +      }
    +      return false; //unlikely.
    +    }
    +
    +    /*
    +     * Gets a scanner for the next region.  If this.currentRegion != null, then
    +     * we will move to the endrow of this.currentRegion.  Else we will get
    +     * scanner at the scan.getStartRow().  We will go no further, just tidy
    +     * up outstanding scanners, if <code>currentRegion != null</code> and
    +     * <code>done</code> is true.
    +     * @param nbRows
    +     * @param done Server-side says we're done scanning.
    +     */
    +  protected boolean nextScanner(int nbRows, final boolean done)
    +    throws IOException {
    +      // Close the previous scanner if it's open
    +      if (this.callable != null) {
    +        this.callable.setClose();
    +        this.caller.callWithRetries(callable,scannerTimeout);
    +        this.callable = null;
    +      }
    +
    +      // Where to start the next scanner
    +      byte [] localStartKey;
    +
    +      // if we're at end of table, close and return false to stop iterating
    +      if (this.currentRegion != null) {
    +        byte [] endKey = this.currentRegion.getEndKey();
    +        if (endKey == null ||
    +            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
    +            checkScanStopRow(endKey) ||
    +            done) {
    +          close();
    +          if (LOG.isTraceEnabled()) {
    +            LOG.trace("Finished " + this.currentRegion);
    +          }
    +          return false;
    +        }
    +        localStartKey = endKey;
    +        if (LOG.isTraceEnabled()) {
    +          LOG.trace("Finished " + this.currentRegion);
    +        }
    +      } else {
    +        localStartKey = this.scan.getStartRow();
    +      }
    +
    +      if (LOG.isDebugEnabled() && this.currentRegion != null) {
    +        // Only worth logging if NOT first region in scan.
    +        LOG.debug("Advancing internal scanner to startKey at '" +
    +          Bytes.toStringBinary(localStartKey) + "'");
    +      }
    +      try {
    +        callable = getScannerCallable(localStartKey, nbRows);
    +        // Open a scanner on the region server starting at the
    +        // beginning of the region
    +        this.caller.callWithRetries(callable,scannerTimeout);
    +        this.currentRegion = callable.getHRegionInfo();
    +        if (this.scanMetrics != null) {
    +          this.scanMetrics.countOfRegions.incrementAndGet();
    +        }
    +      } catch (IOException e) {
    +        close();
    +        throw e;
    +      }
    +      return true;
    +    }
    +
    +    @InterfaceAudience.Private
    +    protected ScannerCallable getScannerCallable(byte [] localStartKey,
    +        int nbRows) {
    +      scan.setStartRow(localStartKey);
    +      ScannerCallable s =
    +              new ScannerCallable((ClusterConnection)getConnection(), getTable(), scan, this.scanMetrics,
    +                  this.rpcControllerFactory);
    +      s.setCaching(nbRows);
    +      return s;
    +    }
    +
    +    /**
    +     * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
    +     * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
    +     * framework because it doesn't support multi-instances of the same metrics on the same machine;
    +     * for scan/map reduce scenarios, we will have multiple scans running at the same time.
    +     *
    +     * By default, scan metrics are disabled; if the application wants to collect them, this behavior
    +     * can be turned on by calling calling:
    +     *
    +     * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
    +     */
    +    protected void writeScanMetrics() {
    +      if (this.scanMetrics == null || scanMetricsPublished) {
    +        return;
    +      }
    +      MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
    +      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
    +      scanMetricsPublished = true;
    +    }
    +
    +    @Override
    +    public Result next() throws IOException {
    +      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
    +      if (cache.size() == 0 && this.closed) {
    +        return null;
    +      }
    +      if (cache.size() == 0) {
    +        Result [] values = null;
    +        long remainingResultSize = maxScannerResultSize;
    +        int countdown = this.caching;
    +        // We need to reset it if it's a new callable that was created
    +        // with a countdown in nextScanner
    +        callable.setCaching(this.caching);
    +        // This flag is set when we want to skip the result returned.  We do
    +        // this when we reset scanner because it split under us.
    +        boolean skipFirst = false;
    +        boolean retryAfterOutOfOrderException  = true;
    +        do {
    +          try {
    +            if (skipFirst) {
    +              // Skip only the first row (which was the last row of the last
    +              // already-processed batch).
    +              callable.setCaching(1);
    +              values = this.caller.callWithRetries(callable,scannerTimeout);
    +              callable.setCaching(this.caching);
    +              skipFirst = false;
    +            }
    +            // Server returns a null values if scanning is to stop.  Else,
    +            // returns an empty array if scanning is to go on and we've just
    +            // exhausted current region.
    +            values = this.caller.callWithRetries(callable,scannerTimeout);
    +            if (skipFirst && values != null && values.length == 1) {
    +              skipFirst = false; // Already skipped, unset it before scanning again
    +              values = this.caller.callWithRetries(callable,scannerTimeout);
    +            }
    +            retryAfterOutOfOrderException  = true;
    +          } catch (DoNotRetryIOException e) {
    +            // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
    +            // to reset the scanner and come back in again.
    +            if (e instanceof UnknownScannerException) {
    +              long timeout = lastNext + scannerTimeout;
    +              // If we are over the timeout, throw this exception to the client wrapped in
    +              // a ScannerTimeoutException. Else, it's because the region moved and we used the old
    +              // id against the new region server; reset the scanner.
    +              //if (timeout < System.currentTimeMillis()) {
    +                //long elapsed = System.currentTimeMillis() - lastNext;
    +                //ScannerTimeoutException ex = new ScannerTimeoutException(
    +                //    elapsed + "ms passed since the last invocation, " +
    +                //        "timeout is currently set to " + scannerTimeout);
    +                //ex.initCause(e);
    +                //throw ex;
    +              //}
    +            } else {
    +              // If exception is any but the list below throw it back to the client; else setup
    +              // the scanner and retry.
    +              Throwable cause = e.getCause();
    +              if ((cause != null && cause instanceof NotServingRegionException) ||
    +                (cause != null && cause instanceof RegionServerStoppedException) ||
    +                e instanceof OutOfOrderScannerNextException) {
    +                // Pass
    +                // It is easier writing the if loop test as list of what is allowed rather than
    +                // as a list of what is not allowed... so if in here, it means we do not throw.
    +              } else {
    +                throw e;
    +              }
    +            }
    +            // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
    +            if (this.lastResult != null) {
    +              // The region has moved. We need to open a brand new scanner at
    +              // the new location.
    +              // Reset the startRow to the row we've seen last so that the new
    +              // scanner starts at the correct row. Otherwise we may see previously
    +              // returned rows again.
    +              // (ScannerCallable by now has "relocated" the correct region)
    +              this.scan.setStartRow(this.lastResult.getRow());
    +
    +              // Skip first row returned.  We already let it out on previous
    +              // invocation.
    +              skipFirst = true;
    +            }
    +            if (e instanceof OutOfOrderScannerNextException) {
    +              if (retryAfterOutOfOrderException) {
    +                retryAfterOutOfOrderException = false;
    +              } else {
    +                // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
    +                throw new DoNotRetryIOException("Failed after retry of " +
    +                  "OutOfOrderScannerNextException: was there a rpc timeout?", e);
    --- End diff --
    
    Well, it does make for entertaining debugging :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---