You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by bhupeshchawda <gi...@git.apache.org> on 2016/08/09 06:18:24 UTC

[GitHub] apex-malhar pull request #358: APEXMALHAR-2172: Updates to JDBC Poll Input O...

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/358#discussion_r74003272
  
    --- Diff: library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---
    @@ -286,271 +146,233 @@ public AbstractJdbcPollInputOperator()
       public void setup(OperatorContext context)
       {
         super.setup(context);
    -    spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
    +    intializeDSLContext();
    +    if (scanService == null) {
    +      scanService = Executors.newScheduledThreadPool(1);
    +    }
         execute = true;
    -    cause = new AtomicReference<Throwable>();
    -    emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
    -    this.context = context;
    +    emitQueue = new LinkedBlockingDeque<>(queueCapacity);
         operatorId = context.getId();
    +    windowManager.setup(context);
    +  }
     
    -    try {
    +  private void intializeDSLContext()
    +  {
    +    create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
    +  }
    +
    +  @Override
    +  public void activate(OperatorContext context)
    +  {
    +    initializePreparedStatement();
    +    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
    +    if (largestRecoveryWindow == Stateless.WINDOW_ID
    +        || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
    +      scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    +    }
    +  }
     
    -      //If its a range query pass upper and lower bounds
    -      //If its a polling query pass only the lower bound
    -      if (getRangeQueryPair().getValue() != null) {
    -        ps = store.getConnection()
    -            .prepareStatement(
    -                JdbcMetaDataUtility.buildRangeQuery(getTableName(), getKey(), rangeQueryPair.getKey(),
    -                    rangeQueryPair.getValue()),
    -                java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    +  protected void initializePreparedStatement()
    +  {
    +    try {
    +      // If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
    +      if (isPollerPartition) {
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } else {
             ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), rangeQueryPair.getKey()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        isPollable = true;
    +            buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           }
    -
         } catch (SQLException e) {
           LOG.error("Exception in initializing the range query for a given partition", e);
           throw new RuntimeException(e);
         }
     
    -    windowManager.setup(context);
    -    LOG.debug("super setup done...");
       }
     
       @Override
       public void beginWindow(long windowId)
       {
         currentWindowId = windowId;
    -
    -    isReplayed = false;
    -
         if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           try {
             replay(currentWindowId);
    +        return;
           } catch (SQLException e) {
             LOG.error("Exception in replayed windows", e);
             throw new RuntimeException(e);
           }
         }
    -
    -    if (isReplayed && currentWindowId == windowManager.getLargestRecoveryWindow()) {
    -      try {
    -        if (!isPollable && rangeQueryPair.getValue() != null) {
    -
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildGTRangeQuery(getTableName(), getKey(), previousUpperBound,
    -                  rangeQueryPair.getValue()),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        } else {
    -          String bound = null;
    -          if (previousUpperBound == null) {
    -            bound = getRangeQueryPair().getKey();
    -          } else {
    -            bound = previousUpperBound;
    -          }
    -          ps = store.getConnection().prepareStatement(
    -              JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -              java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -          isPollable = true;
    -        }
    -        isReplayed = false;
    -        LOG.debug("Prepared statement after re-initialization - {} ", ps.toString());
    -      } catch (SQLException e) {
    -        // TODO Auto-generated catch block
    -        throw new RuntimeException(e);
    -      }
    +    if (isPollerPartition) {
    +      updatePollQuery();
    +      isPolled = false;
         }
    +    lowerBound = lastEmittedRecord;
    +  }
     
    -    //Reset the pollable query with the updated upper and lower bounds
    -    if (isPollable) {
    +  private void updatePollQuery()
    +  {
    +    if ((lastPolledBound != lastEmittedRecord)) {
    +      if (lastEmittedRecord == null) {
    +        lastPolledBound = rangeQueryPair.getKey();
    +      } else {
    +        lastPolledBound = lastEmittedRecord;
    +      }
           try {
    -        String bound = null;
    -        if (previousUpperBound == null && highestPolled == null) {
    -          bound = getRangeQueryPair().getKey();
    -        } else {
    -          bound = highestPolled;
    -        }
    -        ps = store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildPollableQuery(getTableName(), getKey(), bound),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
    -        isPolled = false;
    +        ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledBound, Integer.MAX_VALUE),
    +            TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
           } catch (SQLException e) {
             throw new RuntimeException(e);
           }
         }
     
    -    lower = null;
    -    upper = null;
    -
    -    //Check if a thread is already active and start only if its no
    -    //Do not start the thread from setup, will conflict with the replay
    -    if (dbPoller == null && !isReplayed) {
    -      //If this is not a replayed state, reset the ps to highest read offset + 1, 
    -      //keep the upper bound as the one that was initialized after static partitioning
    -      LOG.info("Statement when re-initialized {}", ps.toString());
    -      dbPoller = new Thread(new DBPoller());
    -      dbPoller.start();
    -    }
       }
     
       @Override
       public void emitTuples()
       {
    -    if (isReplayed) {
    +    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
           return;
         }
    -
    -    List<T> tuples;
    -
    -    if ((tuples = emitQueue.poll()) != null) {
    -      for (Object tuple : tuples) {
    -        if (lower == null) {
    -          lower = tuple.toString();
    -        }
    -        upper = tuple.toString();
    -        outputPort.emit((T)tuple);
    +    int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
    +    while (pollSize-- > 0) {
    +      T obj = emitQueue.poll();
    +      if (obj != null) {
    +        emitTuple(obj);
           }
    +      lastEmittedRecord++;
         }
       }
     
    +  protected void emitTuple(T obj)
    +  {
    +    outputPort.emit(obj);
    +  }
    +
       @Override
       public void endWindow()
       {
         try {
           if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
    -        if (currentWindowRecoveryState == null) {
    -          currentWindowRecoveryState = new MutablePair<String, String>();
    -        }
    -        if (lower != null && upper != null) {
    -          previousUpperBound = upper;
    -          isPolled = true;
    -        }
    -        MutablePair<String, String> windowBoundaryPair = new MutablePair<>(lower, upper);
    -        currentWindowRecoveryState = windowBoundaryPair;
    +        currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRecord);
             windowManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
           }
         } catch (IOException e) {
           throw new RuntimeException("saving recovery", e);
         }
    -    currentWindowRecoveryState = new MutablePair<>();
    -  }
    -
    -  public int getPartitionCount()
    -  {
    -    return partitionCount;
    +    if (threadException != null) {
    +      store.disconnect();
    +      DTThrowable.rethrow(threadException.get());
    +    }
       }
     
    -  public void setPartitionCount(int partitionCount)
    +  @Override
    +  public void deactivate()
       {
    -    this.partitionCount = partitionCount;
    +    scanService.shutdownNow();
    +    store.disconnect();
       }
     
    -  @Override
    -  public void activate(Context cntxt)
    +  protected void pollRecords()
       {
    -    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID
    -        && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowManager.getLargestRecoveryWindow()) {
    -      // If it is a replay state, don't start any threads here
    +    if (isPolled) {
           return;
         }
    -  }
    -
    -  @Override
    -  public void deactivate()
    -  {
         try {
    -      if (dbPoller != null && dbPoller.isAlive()) {
    -        dbPoller.interrupt();
    -        dbPoller.join();
    +      ps.setFetchSize(getFetchSize());
    +      ResultSet result = ps.executeQuery();
    +      if (result.next()) {
    +        do {
    +          emitQueue.offer(getTuple(result));
    +        } while (result.next());
           }
    -    } catch (InterruptedException ex) {
    -      // log and ignore, ending execution anyway
    -      LOG.error("exception in poller thread: ", ex);
    -    }
    -  }
    -
    -  @Override
    -  public void handleIdleTime()
    -  {
    -    if (execute) {
    -      try {
    -        Thread.sleep(spinMillis);
    -      } catch (InterruptedException ie) {
    -        throw new RuntimeException(ie);
    +      isPolled = true;
    +    } catch (SQLException ex) {
    +      execute = false;
    +      threadException = new AtomicReference<Throwable>(ex);
    +    } finally {
    +      if (!isPollerPartition) {
    +        store.disconnect();
           }
    -    } else {
    -      LOG.error("Exception: ", cause);
    -      DTThrowable.rethrow(cause.get());
         }
       }
     
    +  public abstract T getTuple(ResultSet result);
    +
       protected void replay(long windowId) throws SQLException
       {
    -    isReplayed = true;
     
    -    MutablePair<String, String> recoveredData = new MutablePair<String, String>();
         try {
    -      recoveredData = (MutablePair<String, String>)windowManager.load(operatorId, windowId);
    -
    -      if (recoveredData != null) {
    -        //skip the window and return if there was no incoming data in the window
    -        if (recoveredData.left == null || recoveredData.right == null) {
    -          return;
    -        }
    +      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.load(operatorId,
    +          windowId);
     
    -        if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(previousUpperBound)) {
    -          LOG.info("Matched so returning");
    -          return;
    -        }
    -
    -        JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
    -        jdbcPoller.setStore(store);
    -        jdbcPoller.setKey(getKey());
    -        jdbcPoller.setPartitionCount(getPartitionCount());
    -        jdbcPoller.setPollInterval(getPollInterval());
    -        jdbcPoller.setTableName(getTableName());
    -        jdbcPoller.setBatchSize(getBatchSize());
    -        isPollable = false;
    +      if (recoveredData != null && shouldReplayWindow(recoveredData)) {
    +        LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
    +            recoveredData.right);
     
    -        LOG.debug("[Window ID -" + windowId + "," + recoveredData.left + "," + recoveredData.right + "]");
    +        ps = store.getConnection().prepareStatement(
    +            buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
    +            CONCUR_READ_ONLY);
    +        LOG.info("Query formed to recover data - {}", ps.toString());
     
    -        jdbcPoller.setRangeQueryPair(new KeyValPair<String, String>(recoveredData.left, recoveredData.right));
    +        emitReplayedTuples(ps);
     
    -        jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
    -            JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(), jdbcPoller.getKey(),
    -                jdbcPoller.getRangeQueryPair().getKey(), jdbcPoller.getRangeQueryPair().getValue()),
    -            java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
    -        LOG.info("Query formed for recovered data - {}", jdbcPoller.ps.toString());
    +      }
     
    -        emitReplayedTuples(jdbcPoller.ps);
    +      if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
    +        try {
    +          if (!isPollerPartition && rangeQueryPair.getValue() != null) {
    +            ps = store.getConnection().prepareStatement(
    +                buildRangeQuery(lastEmittedRecord, (rangeQueryPair.getValue() - lastEmittedRecord)), TYPE_FORWARD_ONLY,
    +                CONCUR_READ_ONLY);
    +          } else {
    +            Integer bound = null;
    +            if (lastEmittedRecord == null) {
    +              bound = rangeQueryPair.getKey();
    +            } else {
    +              bound = lastEmittedRecord;
    +            }
    +            ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
    +                CONCUR_READ_ONLY);
    +          }
    +          scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
    +        } catch (SQLException e) {
    +          throw new RuntimeException(e);
    +        }
           }
         } catch (IOException e) {
           throw new RuntimeException("replay", e);
    --- End diff --
    
    Better description for the exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---