You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tajo.apache.org by jinossy <gi...@git.apache.org> on 2014/08/25 07:43:38 UTC

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

GitHub user jinossy opened a pull request:

    https://github.com/apache/tajo/pull/124

    TAJO-1015: Add executionblock event in worker

    add ExecutionBlock start/stop event
    add shareable context of ExecutionBlock

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jinossy/tajo TAJO-1015

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/tajo/pull/124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #124
    
----
commit 0371386e9a296b5e343f1d38e2d45677e9d982d9
Author: jhkim <jh...@apache.org>
Date:   2014-08-22T11:42:32Z

    TAJO-1015: Add executionblock event in worker.

commit 26dbf81c1ecd065eb4612863bedd76972958fae7
Author: jhkim <jh...@apache.org>
Date:   2014-08-25T04:41:24Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1015
    
    Conflicts:
    	tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
    	tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
    	tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
    	tajo-core/src/main/java/org/apache/tajo/worker/Task.java

commit 08892028322764db5dac045bce9ca482760cb0d1
Author: jhkim <jh...@apache.org>
Date:   2014-08-25T05:41:37Z

    fixed unit test failure

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on the pull request:

    https://github.com/apache/tajo/pull/124#issuecomment-55517948
  
    +1
    I agree with this proposal. It allows tasks to share one execution context, and it will improve readability and make logic simpler. Before committing it, please remove some commented liens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

Posted by jinossy <gi...@git.apache.org>.
Github user jinossy commented on the pull request:

    https://github.com/apache/tajo/pull/124#issuecomment-55211227
  
    I’ve rebase this issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/tajo/pull/124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on the pull request:

    https://github.com/apache/tajo/pull/124#issuecomment-55072610
  
    I'm sorry for late review. Could you rebase it against the latest revision? If so, I'll review it shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/124#discussion_r17517726
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---
    @@ -0,0 +1,449 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.worker;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocalDirAllocator;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.util.ReflectionUtils;
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.QueryUnitAttemptId;
    +import org.apache.tajo.TajoProtos;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.engine.query.QueryContext;
    +import org.apache.tajo.ipc.QueryMasterProtocol;
    +import org.apache.tajo.rpc.NettyClientBase;
    +import org.apache.tajo.rpc.NullCallback;
    +import org.apache.tajo.rpc.RpcChannelFactory;
    +import org.apache.tajo.rpc.RpcConnectionPool;
    +import org.apache.tajo.storage.HashShuffleAppenderManager;
    +import org.apache.tajo.storage.StorageUtil;
    +import org.apache.tajo.util.Pair;
    +import org.apache.tajo.worker.event.TaskRunnerStartEvent;
    +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
    +
    +public class ExecutionBlockContext {
    +  /** class logger */
    +  private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
    +
    +  private TaskRunnerManager manager;
    +  public AtomicInteger completedTasksNum = new AtomicInteger();
    +  public AtomicInteger succeededTasksNum = new AtomicInteger();
    +  public AtomicInteger killedTasksNum = new AtomicInteger();
    +  public AtomicInteger failedTasksNum = new AtomicInteger();
    +
    +  private ClientSocketChannelFactory channelFactory;
    +  // for temporal or intermediate files
    +  private FileSystem localFS;
    +  // for input files
    +  private FileSystem defaultFS;
    +  private ExecutionBlockId executionBlockId;
    +  private QueryContext queryContext;
    +  private String plan;
    +
    +  private ExecutionBlockSharedResource resource;
    +
    +  private TajoQueryEngine queryEngine;
    +  private RpcConnectionPool connPool;
    +  private InetSocketAddress qmMasterAddr;
    +  private TajoConf systemConf;
    +  // for the doAs block
    +  private UserGroupInformation taskOwner;
    +
    +  private Reporter reporter;
    +
    +  private AtomicBoolean stop = new AtomicBoolean();
    +
    +  // It keeps all of the query unit attempts while a TaskRunner is running.
    +  private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
    +
    +  private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
    +
    +  public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
    +      throws Throwable {
    +    this.manager = manager;
    +    this.executionBlockId = event.getExecutionBlockId();
    +    this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
    +    this.qmMasterAddr = queryMaster;
    +    this.systemConf = manager.getTajoConf();
    +    this.reporter = new Reporter();
    +    this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
    +    this.localFS = FileSystem.getLocal(systemConf);
    +
    +    // Setup QueryEngine according to the query plan
    +    // Here, we can setup row-based query engine or columnar query engine.
    +    this.queryEngine = new TajoQueryEngine(systemConf);
    +    this.queryContext = event.getQueryContext();
    +    this.plan = event.getPlan();
    +    this.resource = new ExecutionBlockSharedResource();
    +
    +    init();
    +  }
    +
    +  public void init() throws Throwable {
    +
    +    LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
    +    LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
    +
    +    LOG.info("QueryMaster Address:" + qmMasterAddr);
    +
    +    UserGroupInformation.setConfiguration(systemConf);
    +    // TODO - 'load credential' should be implemented
    +    // Getting taskOwner
    +    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
    +    //taskOwner.addToken(token);
    +
    +    // initialize MasterWorkerProtocol as an actual task owner.
    +//      this.client =
    --- End diff --
    
    Could you remove the commented lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1015: Add executionblock event in worker

Posted by jinossy <gi...@git.apache.org>.
Github user jinossy commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/124#discussion_r17531064
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---
    @@ -0,0 +1,449 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.worker;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocalDirAllocator;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.util.ReflectionUtils;
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.QueryUnitAttemptId;
    +import org.apache.tajo.TajoProtos;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.engine.query.QueryContext;
    +import org.apache.tajo.ipc.QueryMasterProtocol;
    +import org.apache.tajo.rpc.NettyClientBase;
    +import org.apache.tajo.rpc.NullCallback;
    +import org.apache.tajo.rpc.RpcChannelFactory;
    +import org.apache.tajo.rpc.RpcConnectionPool;
    +import org.apache.tajo.storage.HashShuffleAppenderManager;
    +import org.apache.tajo.storage.StorageUtil;
    +import org.apache.tajo.util.Pair;
    +import org.apache.tajo.worker.event.TaskRunnerStartEvent;
    +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
    +
    +public class ExecutionBlockContext {
    +  /** class logger */
    +  private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class);
    +
    +  private TaskRunnerManager manager;
    +  public AtomicInteger completedTasksNum = new AtomicInteger();
    +  public AtomicInteger succeededTasksNum = new AtomicInteger();
    +  public AtomicInteger killedTasksNum = new AtomicInteger();
    +  public AtomicInteger failedTasksNum = new AtomicInteger();
    +
    +  private ClientSocketChannelFactory channelFactory;
    +  // for temporal or intermediate files
    +  private FileSystem localFS;
    +  // for input files
    +  private FileSystem defaultFS;
    +  private ExecutionBlockId executionBlockId;
    +  private QueryContext queryContext;
    +  private String plan;
    +
    +  private ExecutionBlockSharedResource resource;
    +
    +  private TajoQueryEngine queryEngine;
    +  private RpcConnectionPool connPool;
    +  private InetSocketAddress qmMasterAddr;
    +  private TajoConf systemConf;
    +  // for the doAs block
    +  private UserGroupInformation taskOwner;
    +
    +  private Reporter reporter;
    +
    +  private AtomicBoolean stop = new AtomicBoolean();
    +
    +  // It keeps all of the query unit attempts while a TaskRunner is running.
    +  private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap();
    +
    +  private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
    +
    +  public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
    +      throws Throwable {
    +    this.manager = manager;
    +    this.executionBlockId = event.getExecutionBlockId();
    +    this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
    +    this.qmMasterAddr = queryMaster;
    +    this.systemConf = manager.getTajoConf();
    +    this.reporter = new Reporter();
    +    this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
    +    this.localFS = FileSystem.getLocal(systemConf);
    +
    +    // Setup QueryEngine according to the query plan
    +    // Here, we can setup row-based query engine or columnar query engine.
    +    this.queryEngine = new TajoQueryEngine(systemConf);
    +    this.queryContext = event.getQueryContext();
    +    this.plan = event.getPlan();
    +    this.resource = new ExecutionBlockSharedResource();
    +
    +    init();
    +  }
    +
    +  public void init() throws Throwable {
    +
    +    LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
    +    LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
    +
    +    LOG.info("QueryMaster Address:" + qmMasterAddr);
    +
    +    UserGroupInformation.setConfiguration(systemConf);
    +    // TODO - 'load credential' should be implemented
    +    // Getting taskOwner
    +    UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
    +    //taskOwner.addToken(token);
    +
    +    // initialize MasterWorkerProtocol as an actual task owner.
    +//      this.client =
    --- End diff --
    
    done!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---