You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/18 03:37:04 UTC

[arrow-ballista] branch master updated: Ballista gets a docker image!!! (#521)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 453413f2 Ballista gets a docker image!!! (#521)
453413f2 is described below

commit 453413f21d5a94713bc132b4494d9d80f2494f56
Author: Brent Gardner <br...@spaceandtime.io>
AuthorDate: Thu Nov 17 20:36:58 2022 -0700

    Ballista gets a docker image!!! (#521)
    
    * Ballista gets a docker image!!!
    
    * Enable flight sql
    
    * Allow executing startup script
    
    * Allow executing executables
    
    * Clippy
---
 .github/workflows/rust.yml                | 13 ++++----
 ballista/scheduler/Cargo.toml             |  2 +-
 ballista/scheduler/src/flight_sql.rs      | 53 +++++++++++++++----------------
 dev/docker/ballista-standalone.Dockerfile |  3 ++
 dev/docker/standalone-entrypoint.sh       |  0
 5 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 3f595114..0aac1120 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -145,7 +145,7 @@ jobs:
           export PATH=$PATH:$HOME/d/protoc/bin
           export ARROW_TEST_DATA=$(pwd)/testing/data
           export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
-          cargo test --features flight-sql
+          cargo test
           cd examples
           cargo run --example standalone_sql --features=ballista/standalone
         env:
@@ -304,14 +304,15 @@ jobs:
       - name: Build and push Docker image
         run: |
           echo "github user is $DOCKER_USER"
-          export DOCKER_TAG="$(git describe --exact-match --tags $(git log -n1 --pretty='%h') || echo '0.10.0-test')"
-          if [[ $DOCKER_TAG =~ ^[0-9\.]+$ ]]
+          docker build -t arrow-ballista-standalone:latest -f dev/docker/ballista-standalone.Dockerfile .
+          export DOCKER_TAG="$(git describe --exact-match --tags $(git log -n1 --pretty='%h') || echo '')"
+          if [[ $DOCKER_TAG =~ ^[0-9\.]+(?:-rc[0-9]+)?$ ]]
           then
             echo "publishing docker tag $DOCKER_TAG"
+            docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG 
+            docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
+            docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
           fi
-          docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
-          docker build -t ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG -f dev/docker/ballista-standalone.Dockerfile .
-          docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
         env:
           DOCKER_USER: ${{ github.actor }}
           DOCKER_PASS: ${{ secrets.GITHUB_TOKEN }}
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 2956ee18..69242f53 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -30,7 +30,7 @@ edition = "2018"
 scheduler = "scheduler_config_spec.toml"
 
 [features]
-default = ["etcd", "sled", "prometheus-metrics"]
+default = ["etcd", "sled", "prometheus-metrics", "flight-sql"]
 etcd = ["etcd-client"]
 flight-sql = []
 prometheus-metrics = ["prometheus", "once_cell"]
diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs
index a3c706ef..a5a613c4 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -89,6 +89,7 @@ impl FlightSqlServiceImpl {
         }
     }
 
+    #[allow(deprecated)]
     fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch, ArrowError> {
         let schema = Arc::new(Schema::new(vec![
             Field::new("catalog_name", DataType::Utf8, true),
@@ -96,7 +97,7 @@ impl FlightSqlServiceImpl {
             Field::new("table_name", DataType::Utf8, false),
             Field::new("table_type", DataType::Utf8, false),
         ]));
-        let tables = ctx.tables()?;
+        let tables = ctx.tables()?; // resolved in #501
         let names: Vec<_> = tables.iter().map(|it| Some(it.as_str())).collect();
         let types: Vec<_> = names.iter().map(|_| Some("TABLE")).collect();
         let cats: Vec<_> = names.iter().map(|_| None).collect();
@@ -104,7 +105,7 @@ impl FlightSqlServiceImpl {
         let rb = RecordBatch::try_new(
             schema,
             [cats, schemas, names, types]
-                .into_iter()
+                .iter()
                 .map(|i| Arc::new(StringArray::from(i.clone())) as ArrayRef)
                 .collect::<Vec<_>>(),
         )?;
@@ -120,7 +121,7 @@ impl FlightSqlServiceImpl {
         RecordBatch::try_new(
             schema,
             [TABLE_TYPES]
-                .into_iter()
+                .iter()
                 .map(|i| Arc::new(StringArray::from(i.to_vec())) as ArrayRef)
                 .collect::<Vec<_>>(),
         )
@@ -141,7 +142,7 @@ impl FlightSqlServiceImpl {
                 Status::internal(format!("Failed to create SessionContext: {:?}", e))
             })?;
         let handle = Uuid::new_v4();
-        self.contexts.insert(handle.clone(), ctx);
+        self.contexts.insert(handle, ctx);
         Ok(handle)
     }
 
@@ -149,14 +150,14 @@ impl FlightSqlServiceImpl {
         let auth = req
             .metadata()
             .get("authorization")
-            .ok_or(Status::internal("No authorization header!"))?;
+            .ok_or_else(|| Status::internal("No authorization header!"))?;
         let str = auth
             .to_str()
             .map_err(|e| Status::internal(format!("Error parsing header: {}", e)))?;
         let authorization = str.to_string();
         let bearer = "Bearer ";
         if !authorization.starts_with(bearer) {
-            Err(Status::internal(format!("Invalid auth header!")))?;
+            Err(Status::internal("Invalid auth header!"))?;
         }
         let auth = authorization[bearer.len()..].to_string();
 
@@ -249,17 +250,17 @@ impl FlightSqlServiceImpl {
                 .advertise_flight_sql_endpoint
             {
                 Some(endpoint) => {
-                    let advertise_endpoint_vec: Vec<&str> = endpoint.split(":").collect();
+                    let advertise_endpoint_vec: Vec<&str> = endpoint.split(':').collect();
                     match advertise_endpoint_vec.as_slice() {
                         [host_ip, port] => {
-                            (String::from(*host_ip), FromStr::from_str(*port).expect("Failed to parse port from advertise-endpoint."))
+                            (String::from(*host_ip), FromStr::from_str(port).expect("Failed to parse port from advertise-endpoint."))
                         }
                         _ => {
                             Err(Status::internal("advertise-endpoint flag has incorrect format. Expected IP:Port".to_string()))?
                         }
                     }
                 }
-                None => (exec_host.clone(), exec_port.clone()),
+                None => (exec_host.clone(), exec_port),
             };
 
             let fetch = if let Some(ref id) = loc.partition_id {
@@ -442,21 +443,19 @@ impl FlightSqlServiceImpl {
         Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>>>,
         Status,
     > {
-        let (tx, rx): (
-            Sender<Result<FlightData, Status>>,
-            Receiver<Result<FlightData, Status>>,
-        ) = channel(2);
+        type FlightResult = Result<FlightData, Status>;
+        let (tx, rx): (Sender<FlightResult>, Receiver<FlightResult>) = channel(2);
         let options = IpcWriteOptions::default();
         let schema = SchemaAsIpc::new(rb.schema().as_ref(), &options).into();
         tx.send(Ok(schema))
             .await
-            .map_err(|e| Status::internal("Error sending schema".to_string()))?;
-        let (dict, flight) = flight_data_from_arrow_batch(&rb, &options);
+            .map_err(|_| Status::internal("Error sending schema".to_string()))?;
+        let (dict, flight) = flight_data_from_arrow_batch(rb, &options);
         let flights = dict.into_iter().chain(std::iter::once(flight));
-        for flight in flights.into_iter() {
+        for flight in flights {
             tx.send(Ok(flight))
                 .await
-                .map_err(|e| Status::internal("Error sending flight".to_string()))?;
+                .map_err(|_| Status::internal("Error sending flight".to_string()))?;
         }
         let resp = Response::new(Box::pin(ReceiverStream::new(rx))
             as Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>);
@@ -468,7 +467,7 @@ impl FlightSqlServiceImpl {
         data: &RecordBatch,
         name: &str,
     ) -> Result<Response<FlightInfo>, Status> {
-        let num_bytes = batch_byte_size(&data) as i64;
+        let num_bytes = batch_byte_size(data) as i64;
         let schema = data.schema();
         let num_rows = data.num_rows() as i64;
 
@@ -499,7 +498,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         let authorization = request
             .metadata()
             .get("authorization")
-            .ok_or(Status::invalid_argument("authorization field not present"))?
+            .ok_or_else(|| Status::invalid_argument("authorization field not present"))?
             .to_str()
             .map_err(|_| Status::invalid_argument("authorization not parsable"))?;
         if !authorization.starts_with(basic) {
@@ -513,11 +512,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
             .map_err(|_| Status::invalid_argument("authorization not parsable"))?;
         let str = String::from_utf8(bytes)
             .map_err(|_| Status::invalid_argument("authorization not parsable"))?;
-        let parts: Vec<_> = str.split(":").collect();
+        let parts: Vec<_> = str.split(':').collect();
         if parts.len() != 2 {
-            Err(Status::invalid_argument(format!(
-                "Invalid authorization header"
-            )))?;
+            Err(Status::invalid_argument("Invalid authorization header"))?;
         }
         let user = parts[0];
         let pass = parts[1];
@@ -533,7 +530,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         };
         let result = Ok(result);
         let output = futures::stream::iter(vec![result]);
-        let str = format!("Bearer {}", token.to_string());
+        let str = format!("Bearer {}", token);
         let mut resp: Response<Pin<Box<dyn Stream<Item = Result<_, _>> + Send>>> =
             Response::new(Box::pin(output));
         let md = MetadataValue::try_from(str)
@@ -559,7 +556,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         let action: protobuf::Action = message
             .unpack()
             .map_err(|e| Status::internal(format!("{:?}", e)))?
-            .ok_or(Status::internal("Expected an Action but got None!"))?;
+            .ok_or_else(|| Status::internal("Expected an Action but got None!"))?;
         let fp = match &action.action_type {
             Some(FetchPartition(fp)) => fp.clone(),
             None => Err(Status::internal("Expected an ActionType but got None!"))?,
@@ -569,7 +566,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
         match fp.job_id.as_str() {
             "get_flight_info_table_types" => {
                 debug!("Responding with table types");
-                let rb = FlightSqlServiceImpl::table_types().map_err(|e| {
+                let rb = FlightSqlServiceImpl::table_types().map_err(|_| {
                     Status::internal("Error getting table types".to_string())
                 })?;
                 let resp = Self::record_batch_to_resp(&rb).await?;
@@ -579,7 +576,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
                 debug!("Responding with tables");
                 let rb = self
                     .tables(ctx)
-                    .map_err(|e| Status::internal("Error getting tables".to_string()))?;
+                    .map_err(|_| Status::internal("Error getting tables".to_string()))?;
                 let resp = Self::record_batch_to_resp(&rb).await?;
                 return Ok(resp);
             }
@@ -675,7 +672,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
     async fn get_flight_info_table_types(
         &self,
         _query: CommandGetTableTypes,
-        request: Request<FlightDescriptor>,
+        _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
         debug!("get_flight_info_table_types");
         let data = FlightSqlServiceImpl::table_types()
diff --git a/dev/docker/ballista-standalone.Dockerfile b/dev/docker/ballista-standalone.Dockerfile
index ade413d4..c1027f8d 100644
--- a/dev/docker/ballista-standalone.Dockerfile
+++ b/dev/docker/ballista-standalone.Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get -qq update && apt-get install -qq -y nginx netcat wget
 COPY target/$RELEASE_FLAG/ballista-scheduler /root/ballista-scheduler
 COPY target/$RELEASE_FLAG/ballista-executor /root/ballista-executor
 
+RUN chmod a+x /root/ballista-scheduler && \
+    chmod a+x /root/ballista-executor
+
 # populate some sample data for ListingSchemaProvider
 RUN mkdir -p /data && \
     wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet -P /data/
diff --git a/dev/docker/standalone-entrypoint.sh b/dev/docker/standalone-entrypoint.sh
old mode 100644
new mode 100755