You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/18 03:37:04 UTC
[arrow-ballista] branch master updated: Ballista gets a docker image!!! (#521)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 453413f2 Ballista gets a docker image!!! (#521)
453413f2 is described below
commit 453413f21d5a94713bc132b4494d9d80f2494f56
Author: Brent Gardner <br...@spaceandtime.io>
AuthorDate: Thu Nov 17 20:36:58 2022 -0700
Ballista gets a docker image!!! (#521)
* Ballista gets a docker image!!!
* Enable flight sql
* Allow executing startup script
* Allow executing executables
* Clippy
---
.github/workflows/rust.yml | 13 ++++----
ballista/scheduler/Cargo.toml | 2 +-
ballista/scheduler/src/flight_sql.rs | 53 +++++++++++++++----------------
dev/docker/ballista-standalone.Dockerfile | 3 ++
dev/docker/standalone-entrypoint.sh | 0
5 files changed, 36 insertions(+), 35 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 3f595114..0aac1120 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -145,7 +145,7 @@ jobs:
export PATH=$PATH:$HOME/d/protoc/bin
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
- cargo test --features flight-sql
+ cargo test
cd examples
cargo run --example standalone_sql --features=ballista/standalone
env:
@@ -304,14 +304,15 @@ jobs:
- name: Build and push Docker image
run: |
echo "github user is $DOCKER_USER"
- export DOCKER_TAG="$(git describe --exact-match --tags $(git log -n1 --pretty='%h') || echo '0.10.0-test')"
- if [[ $DOCKER_TAG =~ ^[0-9\.]+$ ]]
+ docker build -t arrow-ballista-standalone:latest -f dev/docker/ballista-standalone.Dockerfile .
+ export DOCKER_TAG="$(git describe --exact-match --tags $(git log -n1 --pretty='%h') || echo '')"
+ if [[ $DOCKER_TAG =~ ^[0-9\.]+(?:-rc[0-9]+)?$ ]]
then
echo "publishing docker tag $DOCKER_TAG"
+ docker tag arrow-ballista-standalone:latest ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
+ docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
+ docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
fi
- docker login ghcr.io -u $DOCKER_USER -p "$DOCKER_PASS"
- docker build -t ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG -f dev/docker/ballista-standalone.Dockerfile .
- docker push ghcr.io/apache/arrow-ballista-standalone:$DOCKER_TAG
env:
DOCKER_USER: ${{ github.actor }}
DOCKER_PASS: ${{ secrets.GITHUB_TOKEN }}
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 2956ee18..69242f53 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -30,7 +30,7 @@ edition = "2018"
scheduler = "scheduler_config_spec.toml"
[features]
-default = ["etcd", "sled", "prometheus-metrics"]
+default = ["etcd", "sled", "prometheus-metrics", "flight-sql"]
etcd = ["etcd-client"]
flight-sql = []
prometheus-metrics = ["prometheus", "once_cell"]
diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs
index a3c706ef..a5a613c4 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -89,6 +89,7 @@ impl FlightSqlServiceImpl {
}
}
+ #[allow(deprecated)]
fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch, ArrowError> {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, true),
@@ -96,7 +97,7 @@ impl FlightSqlServiceImpl {
Field::new("table_name", DataType::Utf8, false),
Field::new("table_type", DataType::Utf8, false),
]));
- let tables = ctx.tables()?;
+ let tables = ctx.tables()?; // resolved in #501
let names: Vec<_> = tables.iter().map(|it| Some(it.as_str())).collect();
let types: Vec<_> = names.iter().map(|_| Some("TABLE")).collect();
let cats: Vec<_> = names.iter().map(|_| None).collect();
@@ -104,7 +105,7 @@ impl FlightSqlServiceImpl {
let rb = RecordBatch::try_new(
schema,
[cats, schemas, names, types]
- .into_iter()
+ .iter()
.map(|i| Arc::new(StringArray::from(i.clone())) as ArrayRef)
.collect::<Vec<_>>(),
)?;
@@ -120,7 +121,7 @@ impl FlightSqlServiceImpl {
RecordBatch::try_new(
schema,
[TABLE_TYPES]
- .into_iter()
+ .iter()
.map(|i| Arc::new(StringArray::from(i.to_vec())) as ArrayRef)
.collect::<Vec<_>>(),
)
@@ -141,7 +142,7 @@ impl FlightSqlServiceImpl {
Status::internal(format!("Failed to create SessionContext: {:?}", e))
})?;
let handle = Uuid::new_v4();
- self.contexts.insert(handle.clone(), ctx);
+ self.contexts.insert(handle, ctx);
Ok(handle)
}
@@ -149,14 +150,14 @@ impl FlightSqlServiceImpl {
let auth = req
.metadata()
.get("authorization")
- .ok_or(Status::internal("No authorization header!"))?;
+ .ok_or_else(|| Status::internal("No authorization header!"))?;
let str = auth
.to_str()
.map_err(|e| Status::internal(format!("Error parsing header: {}", e)))?;
let authorization = str.to_string();
let bearer = "Bearer ";
if !authorization.starts_with(bearer) {
- Err(Status::internal(format!("Invalid auth header!")))?;
+ Err(Status::internal("Invalid auth header!"))?;
}
let auth = authorization[bearer.len()..].to_string();
@@ -249,17 +250,17 @@ impl FlightSqlServiceImpl {
.advertise_flight_sql_endpoint
{
Some(endpoint) => {
- let advertise_endpoint_vec: Vec<&str> = endpoint.split(":").collect();
+ let advertise_endpoint_vec: Vec<&str> = endpoint.split(':').collect();
match advertise_endpoint_vec.as_slice() {
[host_ip, port] => {
- (String::from(*host_ip), FromStr::from_str(*port).expect("Failed to parse port from advertise-endpoint."))
+ (String::from(*host_ip), FromStr::from_str(port).expect("Failed to parse port from advertise-endpoint."))
}
_ => {
Err(Status::internal("advertise-endpoint flag has incorrect format. Expected IP:Port".to_string()))?
}
}
}
- None => (exec_host.clone(), exec_port.clone()),
+ None => (exec_host.clone(), exec_port),
};
let fetch = if let Some(ref id) = loc.partition_id {
@@ -442,21 +443,19 @@ impl FlightSqlServiceImpl {
Response<Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>>>,
Status,
> {
- let (tx, rx): (
- Sender<Result<FlightData, Status>>,
- Receiver<Result<FlightData, Status>>,
- ) = channel(2);
+ type FlightResult = Result<FlightData, Status>;
+ let (tx, rx): (Sender<FlightResult>, Receiver<FlightResult>) = channel(2);
let options = IpcWriteOptions::default();
let schema = SchemaAsIpc::new(rb.schema().as_ref(), &options).into();
tx.send(Ok(schema))
.await
- .map_err(|e| Status::internal("Error sending schema".to_string()))?;
- let (dict, flight) = flight_data_from_arrow_batch(&rb, &options);
+ .map_err(|_| Status::internal("Error sending schema".to_string()))?;
+ let (dict, flight) = flight_data_from_arrow_batch(rb, &options);
let flights = dict.into_iter().chain(std::iter::once(flight));
- for flight in flights.into_iter() {
+ for flight in flights {
tx.send(Ok(flight))
.await
- .map_err(|e| Status::internal("Error sending flight".to_string()))?;
+ .map_err(|_| Status::internal("Error sending flight".to_string()))?;
}
let resp = Response::new(Box::pin(ReceiverStream::new(rx))
as Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + 'static>>);
@@ -468,7 +467,7 @@ impl FlightSqlServiceImpl {
data: &RecordBatch,
name: &str,
) -> Result<Response<FlightInfo>, Status> {
- let num_bytes = batch_byte_size(&data) as i64;
+ let num_bytes = batch_byte_size(data) as i64;
let schema = data.schema();
let num_rows = data.num_rows() as i64;
@@ -499,7 +498,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let authorization = request
.metadata()
.get("authorization")
- .ok_or(Status::invalid_argument("authorization field not present"))?
+ .ok_or_else(|| Status::invalid_argument("authorization field not present"))?
.to_str()
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
if !authorization.starts_with(basic) {
@@ -513,11 +512,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
let str = String::from_utf8(bytes)
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
- let parts: Vec<_> = str.split(":").collect();
+ let parts: Vec<_> = str.split(':').collect();
if parts.len() != 2 {
- Err(Status::invalid_argument(format!(
- "Invalid authorization header"
- )))?;
+ Err(Status::invalid_argument("Invalid authorization header"))?;
}
let user = parts[0];
let pass = parts[1];
@@ -533,7 +530,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
};
let result = Ok(result);
let output = futures::stream::iter(vec![result]);
- let str = format!("Bearer {}", token.to_string());
+ let str = format!("Bearer {}", token);
let mut resp: Response<Pin<Box<dyn Stream<Item = Result<_, _>> + Send>>> =
Response::new(Box::pin(output));
let md = MetadataValue::try_from(str)
@@ -559,7 +556,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
let action: protobuf::Action = message
.unpack()
.map_err(|e| Status::internal(format!("{:?}", e)))?
- .ok_or(Status::internal("Expected an Action but got None!"))?;
+ .ok_or_else(|| Status::internal("Expected an Action but got None!"))?;
let fp = match &action.action_type {
Some(FetchPartition(fp)) => fp.clone(),
None => Err(Status::internal("Expected an ActionType but got None!"))?,
@@ -569,7 +566,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
match fp.job_id.as_str() {
"get_flight_info_table_types" => {
debug!("Responding with table types");
- let rb = FlightSqlServiceImpl::table_types().map_err(|e| {
+ let rb = FlightSqlServiceImpl::table_types().map_err(|_| {
Status::internal("Error getting table types".to_string())
})?;
let resp = Self::record_batch_to_resp(&rb).await?;
@@ -579,7 +576,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
debug!("Responding with tables");
let rb = self
.tables(ctx)
- .map_err(|e| Status::internal("Error getting tables".to_string()))?;
+ .map_err(|_| Status::internal("Error getting tables".to_string()))?;
let resp = Self::record_batch_to_resp(&rb).await?;
return Ok(resp);
}
@@ -675,7 +672,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
async fn get_flight_info_table_types(
&self,
_query: CommandGetTableTypes,
- request: Request<FlightDescriptor>,
+ _request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
debug!("get_flight_info_table_types");
let data = FlightSqlServiceImpl::table_types()
diff --git a/dev/docker/ballista-standalone.Dockerfile b/dev/docker/ballista-standalone.Dockerfile
index ade413d4..c1027f8d 100644
--- a/dev/docker/ballista-standalone.Dockerfile
+++ b/dev/docker/ballista-standalone.Dockerfile
@@ -33,6 +33,9 @@ RUN apt-get -qq update && apt-get install -qq -y nginx netcat wget
COPY target/$RELEASE_FLAG/ballista-scheduler /root/ballista-scheduler
COPY target/$RELEASE_FLAG/ballista-executor /root/ballista-executor
+RUN chmod a+x /root/ballista-scheduler && \
+ chmod a+x /root/ballista-executor
+
# populate some sample data for ListingSchemaProvider
RUN mkdir -p /data && \
wget -q https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet -P /data/
diff --git a/dev/docker/standalone-entrypoint.sh b/dev/docker/standalone-entrypoint.sh
old mode 100644
new mode 100755